LCOV - code coverage report
Current view: top level - pageserver/src/tenant - layer_map.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 92.3 % 1273 1175
Test Date: 2025-04-24 20:31:15 Functions: 94.3 % 88 83

            Line data    Source code
       1              : //!
       2              : //! The layer map tracks what layers exist in a timeline.
       3              : //!
       4              : //! When the timeline is first accessed, the server lists of all layer files
       5              : //! in the timelines/<timeline_id> directory, and populates this map with
       6              : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
       7              : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
       8              : //! records. Now and then, in the checkpoint() function, the in-memory layer is
       9              : //! are frozen, and it is split up into new image and delta layers and the
      10              : //! corresponding files are written to disk.
      11              : //!
      12              : //! Design overview:
      13              : //!
      14              : //! The `search` method of the layer map is on the read critical path, so we've
      15              : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
      16              : //! Other read methods are less critical but still impact performance of background tasks.
      17              : //!
      18              : //! This data structure relies on a persistent/immutable binary search tree. See the
      19              : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
      20              : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
      21              : //! you to modify the tree in such a way that each modification creates a new "version"
      22              : //! of the tree. When you modify it, you get a new version, but all previous versions are
      23              : //! still accessible too. So if someone is still holding a reference to an older version,
      24              : //! they continue to see the tree as it was then. The persistent BST stores all the
      25              : //! different versions in an efficient way.
      26              : //!
      27              : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
      28              : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
      29              : //! to handle the LSN dimension.
      30              : //!
      31              : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
      32              : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
      33              : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
      34              : //! `historic_layer_coverage.rs`.
      35              : //!
      36              : //! To search for a particular key-LSN pair, you first look up the right "version" in the
      37              : //! BTreeMap. Then you search that version of the BST with the key.
      38              : //!
      39              : //! The persistent BST keeps all the versions, but there is no way to change the old versions
      40              : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
      41              : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
      42              : //! to throw away most of the persistent BST and build a new one, starting from the oldest
      43              : //! LSN. See [`LayerMap::flush_updates()`].
      44              : //!
      45              : 
      46              : mod historic_layer_coverage;
      47              : mod layer_coverage;
      48              : 
      49              : use std::collections::{HashMap, VecDeque};
      50              : use std::iter::Peekable;
      51              : use std::ops::Range;
      52              : use std::sync::Arc;
      53              : 
      54              : use anyhow::Result;
      55              : use historic_layer_coverage::BufferedHistoricLayerCoverage;
      56              : pub use historic_layer_coverage::LayerKey;
      57              : use pageserver_api::key::Key;
      58              : use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
      59              : use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
      60              : use tokio::sync::watch;
      61              : use utils::lsn::Lsn;
      62              : 
      63              : use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
      64              : use crate::context::RequestContext;
      65              : use crate::tenant::storage_layer::{InMemoryLayer, ReadableLayerWeak};
      66              : 
      67              : ///
      68              : /// LayerMap tracks what layers exist on a timeline.
      69              : ///
      70              : pub struct LayerMap {
      71              :     //
      72              :     // 'open_layer' holds the current InMemoryLayer that is accepting new
      73              :     // records. If it is None, 'next_open_layer_at' will be set instead, indicating
      74              :     // where the start LSN of the next InMemoryLayer that is to be created.
      75              :     //
      76              :     pub open_layer: Option<Arc<InMemoryLayer>>,
      77              :     pub next_open_layer_at: Option<Lsn>,
      78              : 
      79              :     ///
      80              :     /// Frozen layers, if any. Frozen layers are in-memory layers that
      81              :     /// are no longer added to, but haven't been written out to disk
      82              :     /// yet. They contain WAL older than the current 'open_layer' or
      83              :     /// 'next_open_layer_at', but newer than any historic layer.
      84              :     /// The frozen layers are in order from oldest to newest, so that
      85              :     /// the newest one is in the 'back' of the VecDeque, and the oldest
      86              :     /// in the 'front'.
      87              :     ///
      88              :     pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
      89              : 
      90              :     /// Index of the historic layers optimized for search
      91              :     historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
      92              : 
      93              :     /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
      94              :     /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
      95              :     ///
      96              :     /// NB: make sure to notify `watch_l0_deltas` on changes.
      97              :     l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
      98              : 
      99              :     /// Notifies about L0 delta layer changes, sending the current number of L0 layers.
     100              :     watch_l0_deltas: watch::Sender<usize>,
     101              : }
     102              : 
     103              : impl Default for LayerMap {
     104         2832 :     fn default() -> Self {
     105         2832 :         Self {
     106         2832 :             open_layer: Default::default(),
     107         2832 :             next_open_layer_at: Default::default(),
     108         2832 :             frozen_layers: Default::default(),
     109         2832 :             historic: Default::default(),
     110         2832 :             l0_delta_layers: Default::default(),
     111         2832 :             watch_l0_deltas: watch::channel(0).0,
     112         2832 :         }
     113         2832 :     }
     114              : }
     115              : 
     116              : /// The primary update API for the layer map.
     117              : ///
     118              : /// Batching historic layer insertions and removals is good for
     119              : /// performance and this struct helps us do that correctly.
     120              : #[must_use]
     121              : pub struct BatchedUpdates<'a> {
     122              :     // While we hold this exclusive reference to the layer map the type checker
     123              :     // will prevent us from accidentally reading any unflushed updates.
     124              :     layer_map: &'a mut LayerMap,
     125              : }
     126              : 
     127              : /// Provide ability to batch more updates while hiding the read
     128              : /// API so we don't accidentally read without flushing.
     129              : impl BatchedUpdates<'_> {
     130              :     ///
     131              :     /// Insert an on-disk layer.
     132              :     ///
     133              :     // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
     134        29556 :     pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
     135        29556 :         self.layer_map.insert_historic_noflush(layer_desc)
     136        29556 :     }
     137              : 
     138              :     ///
     139              :     /// Remove an on-disk layer from the map.
     140              :     ///
     141              :     /// This should be called when the corresponding file on disk has been deleted.
     142              :     ///
     143         3096 :     pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
     144         3096 :         self.layer_map.remove_historic_noflush(layer_desc)
     145         3096 :     }
     146              : 
     147              :     // We will flush on drop anyway, but this method makes it
     148              :     // more explicit that there is some work being done.
     149              :     /// Apply all updates
     150        10884 :     pub fn flush(self) {
     151        10884 :         // Flush happens on drop
     152        10884 :     }
     153              : }
     154              : 
     155              : // Ideally the flush() method should be called explicitly for more
     156              : // controlled execution. But if we forget we'd rather flush on drop
     157              : // than panic later or read without flushing.
     158              : //
     159              : // TODO maybe warn if flush hasn't explicitly been called
     160              : impl Drop for BatchedUpdates<'_> {
     161        10884 :     fn drop(&mut self) {
     162        10884 :         self.layer_map.flush_updates();
     163        10884 :     }
     164              : }
     165              : 
     166              : /// Return value of LayerMap::search
     167              : #[derive(Eq, PartialEq, Debug, Hash)]
     168              : pub struct SearchResult {
     169              :     pub layer: ReadableLayerWeak,
     170              :     pub lsn_floor: Lsn,
     171              : }
     172              : 
     173              : /// Return value of [`LayerMap::range_search`]
     174              : ///
     175              : /// Contains a mapping from a layer description to a keyspace
     176              : /// accumulator that contains all the keys which intersect the layer
     177              : /// from the original search space.
     178              : #[derive(Debug)]
     179              : pub struct RangeSearchResult {
     180              :     pub found: HashMap<SearchResult, KeySpaceAccum>,
     181              : }
     182              : 
     183              : impl RangeSearchResult {
     184      6281842 :     fn new() -> Self {
     185      6281842 :         Self {
     186      6281842 :             found: HashMap::new(),
     187      6281842 :         }
     188      6281842 :     }
     189              : 
     190      2011716 :     fn map_to_in_memory_layer(
     191      2011716 :         in_memory_layer: Option<InMemoryLayerDesc>,
     192      2011716 :         range: Range<Key>,
     193      2011716 :     ) -> RangeSearchResult {
     194      2011716 :         match in_memory_layer {
     195       656084 :             Some(inmem) => {
     196       656084 :                 let search_result = SearchResult {
     197       656084 :                     lsn_floor: inmem.get_lsn_range().start,
     198       656084 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     199       656084 :                 };
     200       656084 : 
     201       656084 :                 let mut accum = KeySpaceAccum::new();
     202       656084 :                 accum.add_range(range);
     203       656084 :                 RangeSearchResult {
     204       656084 :                     found: HashMap::from([(search_result, accum)]),
     205       656084 :                 }
     206              :             }
     207      1355632 :             None => RangeSearchResult::new(),
     208              :         }
     209      2011716 :     }
     210              : }
     211              : 
     212              : /// Collector for results of range search queries on the LayerMap.
     213              : /// It should be provided with two iterators for the delta and image coverage
     214              : /// that contain all the changes for layers which intersect the range.
     215              : struct RangeSearchCollector<Iter>
     216              : where
     217              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     218              : {
     219              :     in_memory_layer: Option<InMemoryLayerDesc>,
     220              :     delta_coverage: Peekable<Iter>,
     221              :     image_coverage: Peekable<Iter>,
     222              :     key_range: Range<Key>,
     223              :     end_lsn: Lsn,
     224              : 
     225              :     current_delta: Option<Arc<PersistentLayerDesc>>,
     226              :     current_image: Option<Arc<PersistentLayerDesc>>,
     227              : 
     228              :     result: RangeSearchResult,
     229              : }
     230              : 
     231              : #[derive(Debug)]
     232              : enum NextLayerType {
     233              :     Delta(i128),
     234              :     Image(i128),
     235              :     Both(i128),
     236              : }
     237              : 
     238              : impl NextLayerType {
     239      9805872 :     fn next_change_at_key(&self) -> Key {
     240      9805872 :         match self {
     241       917478 :             NextLayerType::Delta(at) => Key::from_i128(*at),
     242      3872986 :             NextLayerType::Image(at) => Key::from_i128(*at),
     243      5015408 :             NextLayerType::Both(at) => Key::from_i128(*at),
     244              :         }
     245      9805872 :     }
     246              : }
     247              : 
     248              : impl<Iter> RangeSearchCollector<Iter>
     249              : where
     250              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     251              : {
     252      4883718 :     fn new(
     253      4883718 :         key_range: Range<Key>,
     254      4883718 :         end_lsn: Lsn,
     255      4883718 :         in_memory_layer: Option<InMemoryLayerDesc>,
     256      4883718 :         delta_coverage: Iter,
     257      4883718 :         image_coverage: Iter,
     258      4883718 :     ) -> Self {
     259      4883718 :         Self {
     260      4883718 :             in_memory_layer,
     261      4883718 :             delta_coverage: delta_coverage.peekable(),
     262      4883718 :             image_coverage: image_coverage.peekable(),
     263      4883718 :             key_range,
     264      4883718 :             end_lsn,
     265      4883718 :             current_delta: None,
     266      4883718 :             current_image: None,
     267      4883718 :             result: RangeSearchResult::new(),
     268      4883718 :         }
     269      4883718 :     }
     270              : 
     271              :     /// Run the collector. Collection is implemented via a two pointer algorithm.
     272              :     /// One pointer tracks the start of the current range and the other tracks
     273              :     /// the beginning of the next range which will overlap with the next change
     274              :     /// in coverage across both image and delta.
     275      4883718 :     fn collect(mut self) -> RangeSearchResult {
     276      4883718 :         let next_layer_type = self.choose_next_layer_type();
     277      4857018 :         let mut current_range_start = match next_layer_type {
     278              :             None => {
     279              :                 // No changes for the range
     280        26700 :                 self.pad_range(self.key_range.clone());
     281        26700 :                 return self.result;
     282              :             }
     283      4857018 :             Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
     284            0 :                 // Changes only after the end of the range
     285            0 :                 self.pad_range(self.key_range.clone());
     286            0 :                 return self.result;
     287              :             }
     288      4857018 :             Some(layer_type) => {
     289      4857018 :                 // Changes for the range exist.
     290      4857018 :                 let coverage_start = layer_type.next_change_at_key();
     291      4857018 :                 let range_before = self.key_range.start..coverage_start;
     292      4857018 :                 self.pad_range(range_before);
     293      4857018 : 
     294      4857018 :                 self.advance(&layer_type);
     295      4857018 :                 coverage_start
     296              :             }
     297              :         };
     298              : 
     299      9805872 :         while current_range_start < self.key_range.end {
     300      4948854 :             let next_layer_type = self.choose_next_layer_type();
     301      4948854 :             match next_layer_type {
     302        91836 :                 Some(t) => {
     303        91836 :                     let current_range_end = t.next_change_at_key();
     304        91836 :                     self.add_range(current_range_start..current_range_end);
     305        91836 :                     current_range_start = current_range_end;
     306        91836 : 
     307        91836 :                     self.advance(&t);
     308        91836 :                 }
     309      4857018 :                 None => {
     310      4857018 :                     self.add_range(current_range_start..self.key_range.end);
     311      4857018 :                     current_range_start = self.key_range.end;
     312      4857018 :                 }
     313              :             }
     314              :         }
     315              : 
     316      4857018 :         self.result
     317      4883718 :     }
     318              : 
     319              :     /// Map a range which does not intersect any persistent layers to
     320              :     /// the in-memory layer candidate.
     321      4889346 :     fn pad_range(&mut self, key_range: Range<Key>) {
     322      4889346 :         if !key_range.is_empty() {
     323        44160 :             if let Some(ref inmem) = self.in_memory_layer {
     324        19044 :                 let search_result = SearchResult {
     325        19044 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem.clone()),
     326        19044 :                     lsn_floor: inmem.get_lsn_range().start,
     327        19044 :                 };
     328        19044 : 
     329        19044 :                 self.result
     330        19044 :                     .found
     331        19044 :                     .entry(search_result)
     332        19044 :                     .or_default()
     333        19044 :                     .add_range(key_range);
     334        25116 :             }
     335      4845186 :         }
     336      4889346 :     }
     337              : 
     338              :     /// Select the appropiate layer for the given range and update
     339              :     /// the collector.
     340      4948854 :     fn add_range(&mut self, covered_range: Range<Key>) {
     341      4948854 :         let selected = LayerMap::select_layer(
     342      4948854 :             self.current_delta.clone(),
     343      4948854 :             self.current_image.clone(),
     344      4948854 :             self.in_memory_layer.clone(),
     345      4948854 :             self.end_lsn,
     346      4948854 :         );
     347      4948854 : 
     348      4948854 :         match selected {
     349      4943226 :             Some(search_result) => self
     350      4943226 :                 .result
     351      4943226 :                 .found
     352      4943226 :                 .entry(search_result)
     353      4943226 :                 .or_default()
     354      4943226 :                 .add_range(covered_range),
     355         5628 :             None => self.pad_range(covered_range),
     356              :         }
     357      4948854 :     }
     358              : 
     359              :     /// Move to the next coverage change.
     360      4948854 :     fn advance(&mut self, layer_type: &NextLayerType) {
     361      4948854 :         match layer_type {
     362       470691 :             NextLayerType::Delta(_) => {
     363       470691 :                 let (_, layer) = self.delta_coverage.next().unwrap();
     364       470691 :                 self.current_delta = layer;
     365       470691 :             }
     366      1943327 :             NextLayerType::Image(_) => {
     367      1943327 :                 let (_, layer) = self.image_coverage.next().unwrap();
     368      1943327 :                 self.current_image = layer;
     369      1943327 :             }
     370      2534836 :             NextLayerType::Both(_) => {
     371      2534836 :                 let (_, image_layer) = self.image_coverage.next().unwrap();
     372      2534836 :                 let (_, delta_layer) = self.delta_coverage.next().unwrap();
     373      2534836 : 
     374      2534836 :                 self.current_image = image_layer;
     375      2534836 :                 self.current_delta = delta_layer;
     376      2534836 :             }
     377              :         }
     378      4948854 :     }
     379              : 
     380              :     /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
     381      9832572 :     fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
     382      9832572 :         let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
     383      9832572 :         let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
     384      9832572 : 
     385      9832572 :         match (next_delta_at, next_image_at) {
     386      4883718 :             (None, None) => None,
     387       440439 :             (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
     388      1938095 :             (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
     389      2570320 :             (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
     390         5232 :                 Some(NextLayerType::Image(*next_image_at))
     391              :             }
     392      2565088 :             (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
     393        30252 :                 Some(NextLayerType::Delta(*next_delta_at))
     394              :             }
     395      2534836 :             (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
     396              :         }
     397      9832572 :     }
     398              : }
     399              : 
     400              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     401              : pub struct InMemoryLayerDesc {
     402              :     handle: InMemoryLayerHandle,
     403              :     lsn_range: Range<Lsn>,
     404              : }
     405              : 
     406              : impl InMemoryLayerDesc {
     407      7796694 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     408      7796694 :         self.lsn_range.clone()
     409      7796694 :     }
     410              : }
     411              : 
     412              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     413              : enum InMemoryLayerHandle {
     414              :     Open,
     415              :     Frozen(usize),
     416              : }
     417              : 
     418              : impl LayerMap {
     419              :     ///
     420              :     /// Find the latest layer (by lsn.end) that covers the given
     421              :     /// 'key', with lsn.start < 'end_lsn'.
     422              :     ///
     423              :     /// The caller of this function is the page reconstruction
     424              :     /// algorithm looking for the next relevant delta layer, or
     425              :     /// the terminal image layer. The caller will pass the lsn_floor
     426              :     /// value as end_lsn in the next call to search.
     427              :     ///
     428              :     /// If there's an image layer exactly below the given end_lsn,
     429              :     /// search should return that layer regardless if there are
     430              :     /// overlapping deltas.
     431              :     ///
     432              :     /// If the latest layer is a delta and there is an overlapping
     433              :     /// image with it below, the lsn_floor returned should be right
     434              :     /// above that image so we don't skip it in the search. Otherwise
     435              :     /// the lsn_floor returned should be the bottom of the delta layer
     436              :     /// because we should make as much progress down the lsn axis
     437              :     /// as possible. It's fine if this way we skip some overlapping
     438              :     /// deltas, because the delta we returned would contain the same
     439              :     /// wal content.
     440              :     ///
     441              :     /// TODO: This API is convoluted and inefficient. If the caller
     442              :     /// makes N search calls, we'll end up finding the same latest
     443              :     /// image layer N times. We should either cache the latest image
     444              :     /// layer result, or simplify the api to `get_latest_image` and
     445              :     /// `get_latest_delta`, and only call `get_latest_image` once.
     446              :     ///
     447       863772 :     pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
     448       863772 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     449              : 
     450       863772 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     451       863772 :             Some(version) => version,
     452              :             None => {
     453            0 :                 return in_memory_layer.map(|desc| SearchResult {
     454            0 :                     lsn_floor: desc.get_lsn_range().start,
     455            0 :                     layer: ReadableLayerWeak::InMemoryLayer(desc),
     456            0 :                 });
     457              :             }
     458              :         };
     459              : 
     460       863772 :         let latest_delta = version.delta_coverage.query(key.to_i128());
     461       863772 :         let latest_image = version.image_coverage.query(key.to_i128());
     462       863772 : 
     463       863772 :         Self::select_layer(latest_delta, latest_image, in_memory_layer, end_lsn)
     464       863772 :     }
     465              : 
     466              :     /// Select a layer from three potential candidates (in-memory, delta and image layer).
     467              :     /// The candidates represent the first layer of each type which intersect a key range.
     468              :     ///
     469              :     /// Layer types have an in implicit priority (image > delta > in-memory). For instance,
     470              :     /// if we have the option of reading an LSN range from both an image and a delta, we
     471              :     /// should read from the image.
     472      8762106 :     fn select_layer(
     473      8762106 :         delta_layer: Option<Arc<PersistentLayerDesc>>,
     474      8762106 :         image_layer: Option<Arc<PersistentLayerDesc>>,
     475      8762106 :         in_memory_layer: Option<InMemoryLayerDesc>,
     476      8762106 :         end_lsn: Lsn,
     477      8762106 :     ) -> Option<SearchResult> {
     478      8762106 :         assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta()));
     479      8762106 :         assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta()));
     480              : 
     481      8762106 :         match (delta_layer, image_layer, in_memory_layer) {
     482        69600 :             (None, None, None) => None,
     483       346380 :             (None, Some(image), None) => {
     484       346380 :                 let lsn_floor = image.get_lsn_range().start;
     485       346380 :                 Some(SearchResult {
     486       346380 :                     layer: ReadableLayerWeak::PersistentLayer(image),
     487       346380 :                     lsn_floor,
     488       346380 :                 })
     489              :             }
     490       460143 :             (Some(delta), None, None) => {
     491       460143 :                 let lsn_floor = delta.get_lsn_range().start;
     492       460143 :                 Some(SearchResult {
     493       460143 :                     layer: ReadableLayerWeak::PersistentLayer(delta),
     494       460143 :                     lsn_floor,
     495       460143 :                 })
     496              :             }
     497      2813212 :             (Some(delta), Some(image), None) => {
     498      2813212 :                 let img_lsn = image.get_lsn_range().start;
     499      2813212 :                 let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
     500      2813212 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     501      2813212 :                 if image_is_newer || image_exact_match {
     502       267696 :                     Some(SearchResult {
     503       267696 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     504       267696 :                         lsn_floor: img_lsn,
     505       267696 :                     })
     506              :                 } else {
     507              :                     // If the delta overlaps with the image in the LSN dimension, do a partial
     508              :                     // up to the image layer.
     509      2545516 :                     let lsn_floor =
     510      2545516 :                         std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
     511      2545516 :                     Some(SearchResult {
     512      2545516 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     513      2545516 :                         lsn_floor,
     514      2545516 :                     })
     515              :                 }
     516              :             }
     517        69492 :             (None, None, Some(inmem)) => {
     518        69492 :                 let lsn_floor = inmem.get_lsn_range().start;
     519        69492 :                 Some(SearchResult {
     520        69492 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     521        69492 :                     lsn_floor,
     522        69492 :                 })
     523              :             }
     524      2065895 :             (None, Some(image), Some(inmem)) => {
     525      2065895 :                 // If the in-memory layer overlaps with the image in the LSN dimension, do a partial
     526      2065895 :                 // up to the image layer.
     527      2065895 :                 let img_lsn = image.get_lsn_range().start;
     528      2065895 :                 let image_is_newer = image.get_lsn_range().end >= inmem.get_lsn_range().end;
     529      2065895 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     530      2065895 :                 if image_is_newer || image_exact_match {
     531         5244 :                     Some(SearchResult {
     532         5244 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     533         5244 :                         lsn_floor: img_lsn,
     534         5244 :                     })
     535              :                 } else {
     536      2060651 :                     let lsn_floor =
     537      2060651 :                         std::cmp::max(inmem.get_lsn_range().start, image.get_lsn_range().start + 1);
     538      2060651 :                     Some(SearchResult {
     539      2060651 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     540      2060651 :                         lsn_floor,
     541      2060651 :                     })
     542              :                 }
     543              :             }
     544      1462788 :             (Some(delta), None, Some(inmem)) => {
     545      1462788 :                 // Overlaps between delta and in-memory layers are not a valid
     546      1462788 :                 // state, but we handle them here for completeness.
     547      1462788 :                 let delta_end = delta.get_lsn_range().end;
     548      1462788 :                 let delta_is_newer = delta_end >= inmem.get_lsn_range().end;
     549      1462788 :                 let delta_exact_match = delta_end == end_lsn;
     550      1462788 :                 if delta_is_newer || delta_exact_match {
     551           48 :                     Some(SearchResult {
     552           48 :                         lsn_floor: delta.get_lsn_range().start,
     553           48 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     554           48 :                     })
     555              :                 } else {
     556              :                     // If the in-memory layer overlaps with the delta in the LSN dimension, do a partial
     557              :                     // up to the delta layer.
     558      1462740 :                     let lsn_floor =
     559      1462740 :                         std::cmp::max(inmem.get_lsn_range().start, delta.get_lsn_range().end);
     560      1462740 :                     Some(SearchResult {
     561      1462740 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     562      1462740 :                         lsn_floor,
     563      1462740 :                     })
     564              :                 }
     565              :             }
     566      1474596 :             (Some(delta), Some(image), Some(inmem)) => {
     567      1474596 :                 // Determine the preferred persistent layer without taking the in-memory layer
     568      1474596 :                 // into consideration.
     569      1474596 :                 let persistent_res =
     570      1474596 :                     Self::select_layer(Some(delta.clone()), Some(image.clone()), None, end_lsn)
     571      1474596 :                         .unwrap();
     572      1474596 :                 let persistent_l = match persistent_res.layer {
     573      1474596 :                     ReadableLayerWeak::PersistentLayer(l) => l,
     574            0 :                     ReadableLayerWeak::InMemoryLayer(_) => unreachable!(),
     575              :                 };
     576              : 
     577              :                 // Now handle the in-memory layer overlaps.
     578      1474596 :                 let inmem_res = if persistent_l.is_delta() {
     579      1402944 :                     Self::select_layer(Some(persistent_l), None, Some(inmem.clone()), end_lsn)
     580      1402944 :                         .unwrap()
     581              :                 } else {
     582        71652 :                     Self::select_layer(None, Some(persistent_l), Some(inmem.clone()), end_lsn)
     583        71652 :                         .unwrap()
     584              :                 };
     585              : 
     586      1474596 :                 Some(SearchResult {
     587      1474596 :                     layer: inmem_res.layer,
     588      1474596 :                     // Use the more restrictive LSN floor
     589      1474596 :                     lsn_floor: std::cmp::max(persistent_res.lsn_floor, inmem_res.lsn_floor),
     590      1474596 :                 })
     591              :             }
     592              :         }
     593      8762106 :     }
     594              : 
     595      6895434 :     pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
     596      6895434 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     597              : 
     598      6895434 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     599      4883718 :             Some(version) => version,
     600              :             None => {
     601      2011716 :                 return RangeSearchResult::map_to_in_memory_layer(in_memory_layer, key_range);
     602              :             }
     603              :         };
     604              : 
     605      4883718 :         let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
     606      4883718 :         let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
     607      4883718 :         let image_changes = version.image_coverage.range_overlaps(&raw_range);
     608      4883718 : 
     609      4883718 :         let collector = RangeSearchCollector::new(
     610      4883718 :             key_range,
     611      4883718 :             end_lsn,
     612      4883718 :             in_memory_layer,
     613      4883718 :             delta_changes,
     614      4883718 :             image_changes,
     615      4883718 :         );
     616      4883718 :         collector.collect()
     617      6895434 :     }
     618              : 
     619              :     /// Start a batch of updates, applied on drop
     620        10884 :     pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
     621        10884 :         BatchedUpdates { layer_map: self }
     622        10884 :     }
     623              : 
     624              :     ///
     625              :     /// Insert an on-disk layer
     626              :     ///
     627              :     /// Helper function for BatchedUpdates::insert_historic
     628              :     ///
     629              :     /// TODO(chi): remove L generic so that we do not need to pass layer object.
     630        29616 :     pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
     631        29616 :         // TODO: See #3869, resulting #4088, attempted fix and repro #4094
     632        29616 : 
     633        29616 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     634         5988 :             self.l0_delta_layers.push(layer_desc.clone().into());
     635         5988 :             self.watch_l0_deltas
     636         5988 :                 .send_replace(self.l0_delta_layers.len());
     637        23628 :         }
     638              : 
     639        29616 :         self.historic.insert(
     640        29616 :             historic_layer_coverage::LayerKey::from(&layer_desc),
     641        29616 :             layer_desc.into(),
     642        29616 :         );
     643        29616 :     }
     644              : 
     645              :     ///
     646              :     /// Remove an on-disk layer from the map.
     647              :     ///
     648              :     /// Helper function for BatchedUpdates::remove_historic
     649              :     ///
     650         3096 :     pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
     651         3096 :         self.historic
     652         3096 :             .remove(historic_layer_coverage::LayerKey::from(layer_desc));
     653         3096 :         let layer_key = layer_desc.key();
     654         3096 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     655         2424 :             let len_before = self.l0_delta_layers.len();
     656         2424 :             let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
     657        33144 :             l0_delta_layers.retain(|other| other.key() != layer_key);
     658         2424 :             self.l0_delta_layers = l0_delta_layers;
     659         2424 :             self.watch_l0_deltas
     660         2424 :                 .send_replace(self.l0_delta_layers.len());
     661         2424 :             // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
     662         2424 :             // there's a chance that the comparison fails at runtime due to it comparing (pointer,
     663         2424 :             // vtable) pairs.
     664         2424 :             assert_eq!(
     665         2424 :                 self.l0_delta_layers.len(),
     666         2424 :                 len_before - 1,
     667            0 :                 "failed to locate removed historic layer from l0_delta_layers"
     668              :             );
     669          672 :         }
     670         3096 :     }
     671              : 
     672              :     /// Helper function for BatchedUpdates::drop.
     673        10896 :     pub(self) fn flush_updates(&mut self) {
     674        10896 :         self.historic.rebuild();
     675        10896 :     }
     676              : 
     677              :     /// Is there a newer image layer for given key- and LSN-range? Or a set
     678              :     /// of image layers within the specified lsn range that cover the entire
     679              :     /// specified key range?
     680              :     ///
     681              :     /// This is used for garbage collection, to determine if an old layer can
     682              :     /// be deleted.
     683           48 :     pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
     684           48 :         if key.is_empty() {
     685              :             // Vacuously true. There's a newer image for all 0 of the kerys in the range.
     686            0 :             return true;
     687           48 :         }
     688              : 
     689           48 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     690           48 :             Some(v) => v,
     691            0 :             None => return false,
     692              :         };
     693              : 
     694           48 :         let start = key.start.to_i128();
     695           48 :         let end = key.end.to_i128();
     696           48 : 
     697           60 :         let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
     698           60 :             Some(layer) => layer.get_lsn_range().start >= lsn.start,
     699            0 :             None => false,
     700           60 :         };
     701              : 
     702              :         // Check the start is covered
     703           48 :         if !layer_covers(version.image_coverage.query(start)) {
     704           36 :             return false;
     705           12 :         }
     706              : 
     707              :         // Check after all changes of coverage
     708           12 :         for (_, change_val) in version.image_coverage.range(start..end) {
     709           12 :             if !layer_covers(change_val) {
     710            0 :                 return false;
     711           12 :             }
     712              :         }
     713              : 
     714           12 :         true
     715           48 :     }
     716              : 
     717        22152 :     pub fn iter_historic_layers(&self) -> impl ExactSizeIterator<Item = Arc<PersistentLayerDesc>> {
     718        22152 :         self.historic.iter()
     719        22152 :     }
     720              : 
     721              :     /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
     722      7759206 :     pub(crate) fn search_in_memory_layer(&self, below: Lsn) -> Option<InMemoryLayerDesc> {
     723      7759206 :         let is_below = |l: &Arc<InMemoryLayer>| {
     724      6689316 :             let start_lsn = l.get_lsn_range().start;
     725      6689316 :             below > start_lsn
     726      6689316 :         };
     727              : 
     728      7759206 :         if let Some(open) = &self.open_layer {
     729      5471160 :             if is_below(open) {
     730      3625183 :                 return Some(InMemoryLayerDesc {
     731      3625183 :                     handle: InMemoryLayerHandle::Open,
     732      3625183 :                     lsn_range: open.get_lsn_range(),
     733      3625183 :                 });
     734      1845977 :             }
     735      2288046 :         }
     736              : 
     737      4134023 :         self.frozen_layers
     738      4134023 :             .iter()
     739      4134023 :             .enumerate()
     740      4134023 :             .rfind(|(_idx, l)| is_below(l))
     741      4134023 :             .map(|(idx, l)| InMemoryLayerDesc {
     742       595860 :                 handle: InMemoryLayerHandle::Frozen(idx),
     743       595860 :                 lsn_range: l.get_lsn_range(),
     744      4134023 :             })
     745      7759206 :     }
     746              : 
     747      3763267 :     pub(crate) fn in_memory_layer(&self, desc: &InMemoryLayerDesc) -> Arc<InMemoryLayer> {
     748      3763267 :         match desc.handle {
     749      3625183 :             InMemoryLayerHandle::Open => self.open_layer.as_ref().unwrap().clone(),
     750       138084 :             InMemoryLayerHandle::Frozen(idx) => self.frozen_layers[idx].clone(),
     751              :         }
     752      3763267 :     }
     753              : 
     754              :     ///
     755              :     /// Divide the whole given range of keys into sub-ranges based on the latest
     756              :     /// image layer that covers each range at the specified lsn (inclusive).
     757              :     /// This is used when creating  new image layers.
     758           84 :     pub fn image_coverage(
     759           84 :         &self,
     760           84 :         key_range: &Range<Key>,
     761           84 :         lsn: Lsn,
     762           84 :     ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
     763           84 :         let version = match self.historic.get().unwrap().get_version(lsn.0) {
     764           84 :             Some(v) => v,
     765            0 :             None => return vec![],
     766              :         };
     767              : 
     768           84 :         let start = key_range.start.to_i128();
     769           84 :         let end = key_range.end.to_i128();
     770           84 : 
     771           84 :         // Initialize loop variables
     772           84 :         let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
     773           84 :         let mut current_key = start;
     774           84 :         let mut current_val = version.image_coverage.query(start);
     775              : 
     776              :         // Loop through the change events and push intervals
     777           84 :         for (change_key, change_val) in version.image_coverage.range(start..end) {
     778            0 :             let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     779            0 :             coverage.push((kr, current_val.take()));
     780            0 :             current_key = change_key;
     781            0 :             current_val.clone_from(&change_val);
     782            0 :         }
     783              : 
     784              :         // Add the final interval
     785           84 :         let kr = Key::from_i128(current_key)..Key::from_i128(end);
     786           84 :         coverage.push((kr, current_val.take()));
     787           84 : 
     788           84 :         coverage
     789           84 :     }
     790              : 
     791              :     /// Check if the key range resembles that of an L0 layer.
     792        51638 :     pub fn is_l0(key_range: &Range<Key>, is_delta_layer: bool) -> bool {
     793        51638 :         is_delta_layer && key_range == &(Key::MIN..Key::MAX)
     794        51638 :     }
     795              : 
     796              :     /// This function determines which layers are counted in `count_deltas`:
     797              :     /// layers that should count towards deciding whether or not to reimage
     798              :     /// a certain partition range.
     799              :     ///
     800              :     /// There are two kinds of layers we currently consider reimage-worthy:
     801              :     ///
     802              :     /// Case 1: Non-L0 layers are currently reimage-worthy by default.
     803              :     /// TODO Some of these layers are very sparse and cover the entire key
     804              :     ///      range. Replacing 256MB of data (or less!) with terabytes of
     805              :     ///      images doesn't seem wise. We need a better heuristic, possibly
     806              :     ///      based on some of these factors:
     807              :     ///      a) whether this layer has any wal in this partition range
     808              :     ///      b) the size of the layer
     809              :     ///      c) the number of images needed to cover it
     810              :     ///      d) the estimated time until we'll have to reimage over it for GC
     811              :     ///
     812              :     /// Case 2: Since L0 layers by definition cover the entire key space, we consider
     813              :     /// them reimage-worthy only when the entire key space can be covered by very few
     814              :     /// images (currently 1).
     815              :     /// TODO The optimal number should probably be slightly higher than 1, but to
     816              :     ///      implement that we need to plumb a lot more context into this function
     817              :     ///      than just the current partition_range.
     818            0 :     pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
     819            0 :         // Case 1
     820            0 :         if !Self::is_l0(&layer.key_range, layer.is_delta) {
     821            0 :             return true;
     822            0 :         }
     823            0 : 
     824            0 :         // Case 2
     825            0 :         if partition_range == &(Key::MIN..Key::MAX) {
     826            0 :             return true;
     827            0 :         }
     828            0 : 
     829            0 :         false
     830            0 :     }
     831              : 
     832              :     /// Count the height of the tallest stack of reimage-worthy deltas
     833              :     /// in this 2d region.
     834              :     ///
     835              :     /// If `limit` is provided we don't try to count above that number.
     836              :     ///
     837              :     /// This number is used to compute the largest number of deltas that
     838              :     /// we'll need to visit for any page reconstruction in this region.
     839              :     /// We use this heuristic to decide whether to create an image layer.
     840           84 :     pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
     841           84 :         // We get the delta coverage of the region, and for each part of the coverage
     842           84 :         // we recurse right underneath the delta. The recursion depth is limited by
     843           84 :         // the largest result this function could return, which is in practice between
     844           84 :         // 3 and 10 (since we usually try to create an image when the number gets larger).
     845           84 : 
     846           84 :         if lsn.is_empty() || key.is_empty() || limit == Some(0) {
     847            0 :             return 0;
     848           84 :         }
     849              : 
     850           84 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     851           84 :             Some(v) => v,
     852            0 :             None => return 0,
     853              :         };
     854              : 
     855           84 :         let start = key.start.to_i128();
     856           84 :         let end = key.end.to_i128();
     857           84 : 
     858           84 :         // Initialize loop variables
     859           84 :         let mut max_stacked_deltas = 0;
     860           84 :         let mut current_key = start;
     861           84 :         let mut current_val = version.delta_coverage.query(start);
     862              : 
     863              :         // Loop through the delta coverage and recurse on each part
     864           84 :         for (change_key, change_val) in version.delta_coverage.range(start..end) {
     865              :             // If there's a relevant delta in this part, add 1 and recurse down
     866            0 :             if let Some(val) = &current_val {
     867            0 :                 if val.get_lsn_range().end > lsn.start {
     868            0 :                     let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     869            0 :                     let lr = lsn.start..val.get_lsn_range().start;
     870            0 :                     if !kr.is_empty() {
     871            0 :                         let base_count = Self::is_reimage_worthy(val, key) as usize;
     872            0 :                         let new_limit = limit.map(|l| l - base_count);
     873            0 :                         let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     874            0 :                         max_stacked_deltas = std::cmp::max(
     875            0 :                             max_stacked_deltas,
     876            0 :                             base_count + max_stacked_deltas_underneath,
     877            0 :                         );
     878            0 :                     }
     879            0 :                 }
     880            0 :             }
     881              : 
     882            0 :             current_key = change_key;
     883            0 :             current_val.clone_from(&change_val);
     884              :         }
     885              : 
     886              :         // Consider the last part
     887           84 :         if let Some(val) = &current_val {
     888            0 :             if val.get_lsn_range().end > lsn.start {
     889            0 :                 let kr = Key::from_i128(current_key)..Key::from_i128(end);
     890            0 :                 let lr = lsn.start..val.get_lsn_range().start;
     891            0 : 
     892            0 :                 if !kr.is_empty() {
     893            0 :                     let base_count = Self::is_reimage_worthy(val, key) as usize;
     894            0 :                     let new_limit = limit.map(|l| l - base_count);
     895            0 :                     let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     896            0 :                     max_stacked_deltas = std::cmp::max(
     897            0 :                         max_stacked_deltas,
     898            0 :                         base_count + max_stacked_deltas_underneath,
     899            0 :                     );
     900            0 :                 }
     901            0 :             }
     902           84 :         }
     903              : 
     904           84 :         max_stacked_deltas
     905           84 :     }
     906              : 
     907              :     /// Return all L0 delta layers
     908         9972 :     pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
     909         9972 :         &self.l0_delta_layers
     910         9972 :     }
     911              : 
     912              :     /// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers.
     913         2748 :     pub fn watch_level0_deltas(&self) -> watch::Receiver<usize> {
     914         2748 :         self.watch_l0_deltas.subscribe()
     915         2748 :     }
     916              : 
     917              :     /// debugging function to print out the contents of the layer map
     918              :     #[allow(unused)]
     919           12 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     920           12 :         println!("Begin dump LayerMap");
     921           12 : 
     922           12 :         println!("open_layer:");
     923           12 :         if let Some(open_layer) = &self.open_layer {
     924            0 :             open_layer.dump(verbose, ctx).await?;
     925           12 :         }
     926              : 
     927           12 :         println!("frozen_layers:");
     928           12 :         for frozen_layer in self.frozen_layers.iter() {
     929            0 :             frozen_layer.dump(verbose, ctx).await?;
     930              :         }
     931              : 
     932           12 :         println!("historic_layers:");
     933           72 :         for desc in self.iter_historic_layers() {
     934           72 :             desc.dump();
     935           72 :         }
     936           12 :         println!("End dump LayerMap");
     937           12 :         Ok(())
     938           12 :     }
     939              : 
     940              :     /// `read_points` represent the tip of a timeline and any branch points, i.e. the places
     941              :     /// where we expect to serve reads.
     942              :     ///
     943              :     /// This function is O(N) and should be called infrequently.  The caller is responsible for
     944              :     /// looking up and updating the Layer objects for these layer descriptors.
     945         1464 :     pub fn get_visibility(
     946         1464 :         &self,
     947         1464 :         mut read_points: Vec<Lsn>,
     948         1464 :     ) -> (
     949         1464 :         Vec<(Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
     950         1464 :         KeySpace,
     951         1464 :     ) {
     952              :         // This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
     953              :         // KeySpace is intended to be composed statically and iterated over.
     954              :         struct KeyShadow {
     955              :             // Map of range start to range end
     956              :             inner: RangeSetBlaze<i128>,
     957              :         }
     958              : 
     959              :         impl KeyShadow {
     960         1464 :             fn new() -> Self {
     961         1464 :                 Self {
     962         1464 :                     inner: Default::default(),
     963         1464 :                 }
     964         1464 :             }
     965              : 
     966         9612 :             fn contains(&self, range: Range<Key>) -> bool {
     967         9612 :                 let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
     968         9612 :                 self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
     969         9612 :                     CheckSortedDisjoint::from([range_incl]),
     970         9612 :                 ))
     971         9612 :             }
     972              : 
     973              :             /// Add the input range to the keys covered by self.
     974              :             ///
     975              :             /// Return true if inserting this range covered some keys that were previously not covered
     976        11496 :             fn cover(&mut self, insert: Range<Key>) -> bool {
     977        11496 :                 let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
     978        11496 :                 self.inner.ranges_insert(range_incl)
     979        11496 :             }
     980              : 
     981         1488 :             fn reset(&mut self) {
     982         1488 :                 self.inner = Default::default();
     983         1488 :             }
     984              : 
     985         1464 :             fn to_keyspace(&self) -> KeySpace {
     986         1464 :                 let mut accum = KeySpaceAccum::new();
     987         1488 :                 for range_incl in self.inner.ranges() {
     988         1488 :                     let range = Range {
     989         1488 :                         start: Key::from_i128(*range_incl.start()),
     990         1488 :                         end: Key::from_i128(range_incl.end() + 1),
     991         1488 :                     };
     992         1488 :                     accum.add_range(range)
     993              :                 }
     994              : 
     995         1464 :                 accum.to_keyspace()
     996         1464 :             }
     997              :         }
     998              : 
     999              :         // The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
    1000              :         // and a ReadPoint
    1001         1464 :         read_points.sort_by_key(|rp| rp.0);
    1002         1464 :         let mut shadow = KeyShadow::new();
    1003              : 
    1004              :         // We will interleave all our read points and layers into a sorted collection
    1005              :         enum Item {
    1006              :             ReadPoint { lsn: Lsn },
    1007              :             Layer(Arc<PersistentLayerDesc>),
    1008              :         }
    1009              : 
    1010         1464 :         let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
    1011         1464 :         items.extend(self.iter_historic_layers().map(Item::Layer));
    1012         1464 :         items.extend(
    1013         1464 :             read_points
    1014         1464 :                 .into_iter()
    1015         1488 :                 .map(|rp| Item::ReadPoint { lsn: rp }),
    1016         1464 :         );
    1017         1464 : 
    1018         1464 :         // Ordering: we want to iterate like this:
    1019         1464 :         // 1. Highest LSNs first
    1020         1464 :         // 2. Consider images before deltas if they end at the same LSNs (images cover deltas)
    1021         1464 :         // 3. Consider ReadPoints before image layers if they're at the same LSN (readpoints make that image visible)
    1022       381600 :         items.sort_by_key(|item| {
    1023       381600 :             std::cmp::Reverse(match item {
    1024       378876 :                 Item::Layer(layer) => {
    1025       378876 :                     if layer.is_delta() {
    1026       177768 :                         (Lsn(layer.get_lsn_range().end.0 - 1), 0)
    1027              :                     } else {
    1028       201108 :                         (layer.image_layer_lsn(), 1)
    1029              :                     }
    1030              :                 }
    1031         2724 :                 Item::ReadPoint { lsn } => (*lsn, 2),
    1032              :             })
    1033       381600 :         });
    1034         1464 : 
    1035         1464 :         let mut results = Vec::with_capacity(self.historic.len());
    1036         1464 : 
    1037         1464 :         let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
    1038              : 
    1039        24060 :         for item in items {
    1040        22596 :             let (reached_lsn, is_readpoint) = match &item {
    1041         1488 :                 Item::ReadPoint { lsn } => (lsn, true),
    1042        21108 :                 Item::Layer(layer) => (&layer.lsn_range.start, false),
    1043              :             };
    1044        22596 :             maybe_covered_deltas.retain(|d| {
    1045          636 :                 if *reached_lsn >= d.lsn_range.start && is_readpoint {
    1046              :                     // We encountered a readpoint within the delta layer: it is visible
    1047              : 
    1048           12 :                     results.push((d.clone(), LayerVisibilityHint::Visible));
    1049           12 :                     false
    1050          624 :                 } else if *reached_lsn < d.lsn_range.start {
    1051              :                     // We passed the layer's range without encountering a read point: it is not visible
    1052          192 :                     results.push((d.clone(), LayerVisibilityHint::Covered));
    1053          192 :                     false
    1054              :                 } else {
    1055              :                     // We're still in the delta layer: continue iterating
    1056          432 :                     true
    1057              :                 }
    1058        22596 :             });
    1059        22596 : 
    1060        22596 :             match item {
    1061         1488 :                 Item::ReadPoint { lsn: _lsn } => {
    1062         1488 :                     // TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
    1063         1488 :                     // to assume that the whole key range is visible at the branch point.
    1064         1488 :                     shadow.reset();
    1065         1488 :                 }
    1066        21108 :                 Item::Layer(layer) => {
    1067        21108 :                     let visibility = if layer.is_delta() {
    1068         9612 :                         if shadow.contains(layer.get_key_range()) {
    1069              :                             // If a layer isn't visible based on current state, we must defer deciding whether
    1070              :                             // it is truly not visible until we have advanced past the delta's range: we might
    1071              :                             // encounter another branch point within this delta layer's LSN range.
    1072          228 :                             maybe_covered_deltas.push(layer);
    1073          228 :                             continue;
    1074              :                         } else {
    1075         9384 :                             LayerVisibilityHint::Visible
    1076              :                         }
    1077              :                     } else {
    1078        11496 :                         let modified = shadow.cover(layer.get_key_range());
    1079        11496 :                         if modified {
    1080              :                             // An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
    1081        11280 :                             LayerVisibilityHint::Visible
    1082              :                         } else {
    1083              :                             // An image layer in a region that was already covered
    1084          216 :                             LayerVisibilityHint::Covered
    1085              :                         }
    1086              :                     };
    1087              : 
    1088        20880 :                     results.push((layer, visibility));
    1089              :                 }
    1090              :             }
    1091              :         }
    1092              : 
    1093              :         // Drain any remaining maybe_covered deltas
    1094         1464 :         results.extend(
    1095         1464 :             maybe_covered_deltas
    1096         1464 :                 .into_iter()
    1097         1464 :                 .map(|d| (d, LayerVisibilityHint::Covered)),
    1098         1464 :         );
    1099         1464 : 
    1100         1464 :         (results, shadow.to_keyspace())
    1101         1464 :     }
    1102              : }
    1103              : 
    1104              : #[cfg(test)]
    1105              : mod tests {
    1106              :     use std::collections::HashMap;
    1107              :     use std::path::PathBuf;
    1108              : 
    1109              :     use crate::{
    1110              :         DEFAULT_PG_VERSION,
    1111              :         tenant::{harness::TenantHarness, storage_layer::LayerName},
    1112              :     };
    1113              :     use pageserver_api::key::DBDIR_KEY;
    1114              :     use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
    1115              :     use tokio_util::sync::CancellationToken;
    1116              :     use utils::id::{TenantId, TimelineId};
    1117              :     use utils::shard::TenantShardId;
    1118              : 
    1119              :     use super::*;
    1120              :     use crate::tenant::IndexPart;
    1121              : 
    1122              :     #[derive(Clone)]
    1123              :     struct LayerDesc {
    1124              :         key_range: Range<Key>,
    1125              :         lsn_range: Range<Lsn>,
    1126              :         is_delta: bool,
    1127              :     }
    1128              : 
    1129           12 :     fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
    1130           12 :         let mut layer_map = LayerMap::default();
    1131              : 
    1132           72 :         for layer in layers {
    1133           60 :             layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
    1134           60 :                 layer.key_range,
    1135           60 :                 layer.lsn_range,
    1136           60 :                 layer.is_delta,
    1137           60 :             ));
    1138           60 :         }
    1139              : 
    1140           12 :         layer_map.flush_updates();
    1141           12 :         layer_map
    1142           12 :     }
    1143              : 
    1144        42492 :     fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
    1145        42492 :         let lhs: HashMap<SearchResult, KeySpace> = lhs
    1146        42492 :             .found
    1147        42492 :             .into_iter()
    1148        81840 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1149        42492 :             .collect();
    1150        42492 :         let rhs: HashMap<SearchResult, KeySpace> = rhs
    1151        42492 :             .found
    1152        42492 :             .into_iter()
    1153        81840 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1154        42492 :             .collect();
    1155        42492 : 
    1156        42492 :         assert_eq!(lhs, rhs);
    1157        42492 :     }
    1158              : 
    1159              :     #[cfg(test)]
    1160        42480 :     fn brute_force_range_search(
    1161        42480 :         layer_map: &LayerMap,
    1162        42480 :         key_range: Range<Key>,
    1163        42480 :         end_lsn: Lsn,
    1164        42480 :     ) -> RangeSearchResult {
    1165        42480 :         let mut range_search_result = RangeSearchResult::new();
    1166        42480 : 
    1167        42480 :         let mut key = key_range.start;
    1168       906240 :         while key != key_range.end {
    1169       863760 :             let res = layer_map.search(key, end_lsn);
    1170       863760 :             if let Some(res) = res {
    1171       799800 :                 range_search_result
    1172       799800 :                     .found
    1173       799800 :                     .entry(res)
    1174       799800 :                     .or_default()
    1175       799800 :                     .add_key(key);
    1176       799800 :             }
    1177              : 
    1178       863760 :             key = key.next();
    1179              :         }
    1180              : 
    1181        42480 :         range_search_result
    1182        42480 :     }
    1183              : 
    1184              :     #[test]
    1185           12 :     fn ranged_search_on_empty_layer_map() {
    1186           12 :         let layer_map = LayerMap::default();
    1187           12 :         let range = Key::from_i128(100)..Key::from_i128(200);
    1188           12 : 
    1189           12 :         let res = layer_map.range_search(range.clone(), Lsn(100));
    1190           12 :         assert_range_search_result_eq(res, RangeSearchResult::new());
    1191           12 :     }
    1192              : 
    1193              :     #[tokio::test]
    1194           12 :     async fn ranged_search() {
    1195           12 :         let harness = TenantHarness::create("ranged_search").await.unwrap();
    1196           12 :         let (tenant, ctx) = harness.load().await;
    1197           12 :         let cancel = CancellationToken::new();
    1198           12 :         let timeline_id = TimelineId::generate();
    1199           12 :         // Create the timeline such that the in-memory layers can be written
    1200           12 :         // to the timeline directory.
    1201           12 :         tenant
    1202           12 :             .create_test_timeline(timeline_id, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1203           12 :             .await
    1204           12 :             .unwrap();
    1205           12 : 
    1206           12 :         let gate = utils::sync::gate::Gate::default();
    1207           12 :         let add_in_memory_layer = async |layer_map: &mut LayerMap, lsn_range: Range<Lsn>| {
    1208           12 :             let layer = InMemoryLayer::create(
    1209           12 :                 harness.conf,
    1210           12 :                 timeline_id,
    1211           12 :                 harness.tenant_shard_id,
    1212           12 :                 lsn_range.start,
    1213           12 :                 &gate,
    1214           12 :                 &cancel,
    1215           12 :                 &ctx,
    1216           12 :             )
    1217           12 :             .await
    1218           12 :             .unwrap();
    1219           12 : 
    1220           12 :             layer.freeze(lsn_range.end).await;
    1221           12 : 
    1222           12 :             layer_map.frozen_layers.push_back(Arc::new(layer));
    1223           12 :         };
    1224           12 : 
    1225           12 :         let in_memory_layer_configurations = [
    1226           12 :             vec![],
    1227           12 :             // Overlaps with the top-most image
    1228           12 :             vec![Lsn(35)..Lsn(50)],
    1229           12 :         ];
    1230           12 : 
    1231           12 :         let layers = vec![
    1232           12 :             LayerDesc {
    1233           12 :                 key_range: Key::from_i128(15)..Key::from_i128(50),
    1234           12 :                 lsn_range: Lsn(5)..Lsn(6),
    1235           12 :                 is_delta: false,
    1236           12 :             },
    1237           12 :             LayerDesc {
    1238           12 :                 key_range: Key::from_i128(10)..Key::from_i128(20),
    1239           12 :                 lsn_range: Lsn(5)..Lsn(20),
    1240           12 :                 is_delta: true,
    1241           12 :             },
    1242           12 :             LayerDesc {
    1243           12 :                 key_range: Key::from_i128(15)..Key::from_i128(25),
    1244           12 :                 lsn_range: Lsn(20)..Lsn(30),
    1245           12 :                 is_delta: true,
    1246           12 :             },
    1247           12 :             LayerDesc {
    1248           12 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1249           12 :                 lsn_range: Lsn(25)..Lsn(35),
    1250           12 :                 is_delta: true,
    1251           12 :             },
    1252           12 :             LayerDesc {
    1253           12 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1254           12 :                 lsn_range: Lsn(40)..Lsn(41),
    1255           12 :                 is_delta: false,
    1256           12 :             },
    1257           12 :         ];
    1258           12 : 
    1259           12 :         let mut layer_map = create_layer_map(layers.clone());
    1260           36 :         for in_memory_layers in in_memory_layer_configurations {
    1261           36 :             for in_mem_layer_range in in_memory_layers {
    1262           12 :                 add_in_memory_layer(&mut layer_map, in_mem_layer_range).await;
    1263           12 :             }
    1264           12 : 
    1265         1464 :             for start in 0..60 {
    1266        42480 :                 for end in (start + 1)..60 {
    1267        42480 :                     let range = Key::from_i128(start)..Key::from_i128(end);
    1268        42480 :                     let result = layer_map.range_search(range.clone(), Lsn(100));
    1269        42480 :                     let expected = brute_force_range_search(&layer_map, range, Lsn(100));
    1270        42480 : 
    1271        42480 :                     eprintln!("{start}..{end}: {result:?}");
    1272        42480 : 
    1273        42480 :                     assert_range_search_result_eq(result, expected);
    1274        42480 :                 }
    1275           12 :             }
    1276           12 :         }
    1277           12 :     }
    1278              : 
    1279              :     #[test]
    1280           12 :     fn layer_visibility_basic() {
    1281           12 :         // A simple synthetic input, as a smoke test.
    1282           12 :         let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
    1283           12 :         let timeline_id = TimelineId::generate();
    1284           12 :         let mut layer_map = LayerMap::default();
    1285           12 :         let mut updates = layer_map.batch_update();
    1286              : 
    1287              :         const FAKE_LAYER_SIZE: u64 = 1024;
    1288              : 
    1289           12 :         let inject_delta = |updates: &mut BatchedUpdates,
    1290              :                             key_start: i128,
    1291              :                             key_end: i128,
    1292              :                             lsn_start: u64,
    1293           84 :                             lsn_end: u64| {
    1294           84 :             let desc = PersistentLayerDesc::new_delta(
    1295           84 :                 tenant_shard_id,
    1296           84 :                 timeline_id,
    1297           84 :                 Range {
    1298           84 :                     start: Key::from_i128(key_start),
    1299           84 :                     end: Key::from_i128(key_end),
    1300           84 :                 },
    1301           84 :                 Range {
    1302           84 :                     start: Lsn(lsn_start),
    1303           84 :                     end: Lsn(lsn_end),
    1304           84 :                 },
    1305           84 :                 1024,
    1306           84 :             );
    1307           84 :             updates.insert_historic(desc.clone());
    1308           84 :             desc
    1309           84 :         };
    1310              : 
    1311           12 :         let inject_image =
    1312           72 :             |updates: &mut BatchedUpdates, key_start: i128, key_end: i128, lsn: u64| {
    1313           72 :                 let desc = PersistentLayerDesc::new_img(
    1314           72 :                     tenant_shard_id,
    1315           72 :                     timeline_id,
    1316           72 :                     Range {
    1317           72 :                         start: Key::from_i128(key_start),
    1318           72 :                         end: Key::from_i128(key_end),
    1319           72 :                     },
    1320           72 :                     Lsn(lsn),
    1321           72 :                     FAKE_LAYER_SIZE,
    1322           72 :                 );
    1323           72 :                 updates.insert_historic(desc.clone());
    1324           72 :                 desc
    1325           72 :             };
    1326              : 
    1327              :         //
    1328              :         // Construct our scenario: the following lines go in backward-LSN order, constructing the various scenarios
    1329              :         // we expect to handle.  You can follow these examples through in the same order as they would be processed
    1330              :         // by the function under test.
    1331              :         //
    1332              : 
    1333           12 :         let mut read_points = vec![Lsn(1000)];
    1334           12 : 
    1335           12 :         // A delta ahead of any image layer
    1336           12 :         let ahead_layer = inject_delta(&mut updates, 10, 20, 101, 110);
    1337           12 : 
    1338           12 :         // An image layer is visible and covers some layers beneath itself
    1339           12 :         let visible_covering_img = inject_image(&mut updates, 5, 25, 99);
    1340           12 : 
    1341           12 :         // A delta layer covered by the image layer: should be covered
    1342           12 :         let covered_delta = inject_delta(&mut updates, 10, 20, 90, 100);
    1343           12 : 
    1344           12 :         // A delta layer partially covered by an image layer: should be visible
    1345           12 :         let partially_covered_delta = inject_delta(&mut updates, 1, 7, 90, 100);
    1346           12 : 
    1347           12 :         // A delta layer not covered by an image layer: should be visible
    1348           12 :         let not_covered_delta = inject_delta(&mut updates, 1, 4, 90, 100);
    1349           12 : 
    1350           12 :         // An image layer covered by the image layer above: should be covered
    1351           12 :         let covered_image = inject_image(&mut updates, 10, 20, 89);
    1352           12 : 
    1353           12 :         // An image layer partially covered by an image layer: should be visible
    1354           12 :         let partially_covered_image = inject_image(&mut updates, 1, 7, 89);
    1355           12 : 
    1356           12 :         // An image layer not covered by an image layer: should be visible
    1357           12 :         let not_covered_image = inject_image(&mut updates, 1, 4, 89);
    1358           12 : 
    1359           12 :         // A read point: this will make subsequent layers below here visible, even if there are
    1360           12 :         // more recent layers covering them.
    1361           12 :         read_points.push(Lsn(80));
    1362           12 : 
    1363           12 :         // A delta layer covered by an earlier image layer, but visible to a readpoint below that covering layer
    1364           12 :         let covered_delta_below_read_point = inject_delta(&mut updates, 10, 20, 70, 79);
    1365           12 : 
    1366           12 :         // A delta layer whose end LSN is covered, but where a read point is present partway through its LSN range:
    1367           12 :         // the read point should make it visible, even though its end LSN is covered
    1368           12 :         let covering_img_between_read_points = inject_image(&mut updates, 10, 20, 69);
    1369           12 :         let covered_delta_between_read_points = inject_delta(&mut updates, 10, 15, 67, 69);
    1370           12 :         read_points.push(Lsn(65));
    1371           12 :         let covered_delta_intersects_read_point = inject_delta(&mut updates, 15, 20, 60, 69);
    1372           12 : 
    1373           12 :         let visible_img_after_last_read_point = inject_image(&mut updates, 10, 20, 65);
    1374           12 : 
    1375           12 :         updates.flush();
    1376           12 : 
    1377           12 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1378           12 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1379           12 : 
    1380           12 :         assert_eq!(
    1381           12 :             layer_visibilities.get(&ahead_layer),
    1382           12 :             Some(&LayerVisibilityHint::Visible)
    1383           12 :         );
    1384           12 :         assert_eq!(
    1385           12 :             layer_visibilities.get(&visible_covering_img),
    1386           12 :             Some(&LayerVisibilityHint::Visible)
    1387           12 :         );
    1388           12 :         assert_eq!(
    1389           12 :             layer_visibilities.get(&covered_delta),
    1390           12 :             Some(&LayerVisibilityHint::Covered)
    1391           12 :         );
    1392           12 :         assert_eq!(
    1393           12 :             layer_visibilities.get(&partially_covered_delta),
    1394           12 :             Some(&LayerVisibilityHint::Visible)
    1395           12 :         );
    1396           12 :         assert_eq!(
    1397           12 :             layer_visibilities.get(&not_covered_delta),
    1398           12 :             Some(&LayerVisibilityHint::Visible)
    1399           12 :         );
    1400           12 :         assert_eq!(
    1401           12 :             layer_visibilities.get(&covered_image),
    1402           12 :             Some(&LayerVisibilityHint::Covered)
    1403           12 :         );
    1404           12 :         assert_eq!(
    1405           12 :             layer_visibilities.get(&partially_covered_image),
    1406           12 :             Some(&LayerVisibilityHint::Visible)
    1407           12 :         );
    1408           12 :         assert_eq!(
    1409           12 :             layer_visibilities.get(&not_covered_image),
    1410           12 :             Some(&LayerVisibilityHint::Visible)
    1411           12 :         );
    1412           12 :         assert_eq!(
    1413           12 :             layer_visibilities.get(&covered_delta_below_read_point),
    1414           12 :             Some(&LayerVisibilityHint::Visible)
    1415           12 :         );
    1416           12 :         assert_eq!(
    1417           12 :             layer_visibilities.get(&covering_img_between_read_points),
    1418           12 :             Some(&LayerVisibilityHint::Visible)
    1419           12 :         );
    1420           12 :         assert_eq!(
    1421           12 :             layer_visibilities.get(&covered_delta_between_read_points),
    1422           12 :             Some(&LayerVisibilityHint::Covered)
    1423           12 :         );
    1424           12 :         assert_eq!(
    1425           12 :             layer_visibilities.get(&covered_delta_intersects_read_point),
    1426           12 :             Some(&LayerVisibilityHint::Visible)
    1427           12 :         );
    1428           12 :         assert_eq!(
    1429           12 :             layer_visibilities.get(&visible_img_after_last_read_point),
    1430           12 :             Some(&LayerVisibilityHint::Visible)
    1431           12 :         );
    1432              : 
    1433              :         // Shadow should include all the images below the last read point
    1434           12 :         let expected_shadow = KeySpace {
    1435           12 :             ranges: vec![Key::from_i128(10)..Key::from_i128(20)],
    1436           12 :         };
    1437           12 :         assert_eq!(shadow, expected_shadow);
    1438           12 :     }
    1439              : 
    1440           12 :     fn fixture_path(relative: &str) -> PathBuf {
    1441           12 :         PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
    1442           12 :     }
    1443              : 
    1444              :     #[test]
    1445           12 :     fn layer_visibility_realistic() {
    1446           12 :         // Load a large example layermap
    1447           12 :         let index_raw = std::fs::read_to_string(fixture_path(
    1448           12 :             "test_data/indices/mixed_workload/index_part.json",
    1449           12 :         ))
    1450           12 :         .unwrap();
    1451           12 :         let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
    1452           12 : 
    1453           12 :         let tenant_id = TenantId::generate();
    1454           12 :         let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1455           12 :         let timeline_id = TimelineId::generate();
    1456           12 : 
    1457           12 :         let mut layer_map = LayerMap::default();
    1458           12 :         let mut updates = layer_map.batch_update();
    1459        18780 :         for (layer_name, layer_metadata) in index.layer_metadata {
    1460        18768 :             let layer_desc = match layer_name {
    1461         9624 :                 LayerName::Image(layer_name) => PersistentLayerDesc {
    1462         9624 :                     key_range: layer_name.key_range.clone(),
    1463         9624 :                     lsn_range: layer_name.lsn_as_range(),
    1464         9624 :                     tenant_shard_id,
    1465         9624 :                     timeline_id,
    1466         9624 :                     is_delta: false,
    1467         9624 :                     file_size: layer_metadata.file_size,
    1468         9624 :                 },
    1469         9144 :                 LayerName::Delta(layer_name) => PersistentLayerDesc {
    1470         9144 :                     key_range: layer_name.key_range,
    1471         9144 :                     lsn_range: layer_name.lsn_range,
    1472         9144 :                     tenant_shard_id,
    1473         9144 :                     timeline_id,
    1474         9144 :                     is_delta: true,
    1475         9144 :                     file_size: layer_metadata.file_size,
    1476         9144 :                 },
    1477              :             };
    1478        18768 :             updates.insert_historic(layer_desc);
    1479              :         }
    1480           12 :         updates.flush();
    1481           12 : 
    1482           12 :         let read_points = vec![index.metadata.disk_consistent_lsn()];
    1483           12 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1484        18780 :         for (layer_desc, visibility) in &layer_visibilities {
    1485        18768 :             tracing::info!("{layer_desc:?}: {visibility:?}");
    1486        18768 :             eprintln!("{layer_desc:?}: {visibility:?}");
    1487              :         }
    1488              : 
    1489              :         // The shadow should be non-empty, since there were some image layers
    1490           12 :         assert!(!shadow.ranges.is_empty());
    1491              : 
    1492              :         // At least some layers should be marked covered
    1493           12 :         assert!(
    1494           12 :             layer_visibilities
    1495           12 :                 .iter()
    1496          228 :                 .any(|i| matches!(i.1, LayerVisibilityHint::Covered))
    1497           12 :         );
    1498              : 
    1499           12 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1500              : 
    1501              :         // Brute force validation: a layer should be marked covered if and only if there are image layers above it in LSN order which cover it
    1502        18780 :         for (layer_desc, visible) in &layer_visibilities {
    1503        18768 :             let mut coverage = KeySpaceRandomAccum::new();
    1504        18768 :             let mut covered_by = Vec::new();
    1505              : 
    1506     29353152 :             for other_layer in layer_map.iter_historic_layers() {
    1507     29353152 :                 if &other_layer == layer_desc {
    1508        18768 :                     continue;
    1509     29334384 :                 }
    1510     29334384 :                 if !other_layer.is_delta()
    1511     15042312 :                     && other_layer.image_layer_lsn() >= Lsn(layer_desc.get_lsn_range().end.0 - 1)
    1512      7581072 :                     && other_layer.key_range.start <= layer_desc.key_range.end
    1513      2791824 :                     && layer_desc.key_range.start <= other_layer.key_range.end
    1514       513876 :                 {
    1515       513876 :                     coverage.add_range(other_layer.get_key_range());
    1516       513876 :                     covered_by.push((*other_layer).clone());
    1517     28820508 :                 }
    1518              :             }
    1519        18768 :             let coverage = coverage.to_keyspace();
    1520              : 
    1521        18768 :             let expect_visible = if coverage.ranges.len() == 1
    1522         4536 :                 && coverage.contains(&layer_desc.key_range.start)
    1523          204 :                 && coverage.contains(&Key::from_i128(layer_desc.key_range.end.to_i128() - 1))
    1524              :             {
    1525          120 :                 LayerVisibilityHint::Covered
    1526              :             } else {
    1527        18648 :                 LayerVisibilityHint::Visible
    1528              :             };
    1529              : 
    1530        18768 :             if expect_visible != *visible {
    1531            0 :                 eprintln!(
    1532            0 :                     "Layer {}..{} @ {}..{} (delta={}) is {visible:?}, should be {expect_visible:?}",
    1533            0 :                     layer_desc.key_range.start,
    1534            0 :                     layer_desc.key_range.end,
    1535            0 :                     layer_desc.lsn_range.start,
    1536            0 :                     layer_desc.lsn_range.end,
    1537            0 :                     layer_desc.is_delta()
    1538            0 :                 );
    1539            0 :                 if expect_visible == LayerVisibilityHint::Covered {
    1540            0 :                     eprintln!("Covered by:");
    1541            0 :                     for other in covered_by {
    1542            0 :                         eprintln!(
    1543            0 :                             "  {}..{} @ {}",
    1544            0 :                             other.get_key_range().start,
    1545            0 :                             other.get_key_range().end,
    1546            0 :                             other.image_layer_lsn()
    1547            0 :                         );
    1548            0 :                     }
    1549            0 :                     if let Some(range) = coverage.ranges.first() {
    1550            0 :                         eprintln!(
    1551            0 :                             "Total coverage from contributing layers: {}..{}",
    1552            0 :                             range.start, range.end
    1553            0 :                         );
    1554            0 :                     } else {
    1555            0 :                         eprintln!(
    1556            0 :                             "Total coverage from contributing layers: {:?}",
    1557            0 :                             coverage.ranges
    1558            0 :                         );
    1559            0 :                     }
    1560            0 :                 }
    1561        18768 :             }
    1562        18768 :             assert_eq!(expect_visible, *visible);
    1563              :         }
    1564              : 
    1565              :         // Sanity: the layer that holds latest data for the DBDIR key should always be visible
    1566              :         // (just using this key as a key that will always exist for any layermap fixture)
    1567           12 :         let dbdir_layer = {
    1568           12 :             let readable_layer = layer_map
    1569           12 :                 .search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
    1570           12 :                 .unwrap();
    1571           12 : 
    1572           12 :             match readable_layer.layer {
    1573           12 :                 ReadableLayerWeak::PersistentLayer(desc) => desc,
    1574            0 :                 ReadableLayerWeak::InMemoryLayer(_) => unreachable!(""),
    1575              :             }
    1576              :         };
    1577           12 :         assert!(matches!(
    1578           12 :             layer_visibilities.get(&dbdir_layer).unwrap(),
    1579              :             LayerVisibilityHint::Visible
    1580              :         ));
    1581           12 :     }
    1582              : }
    1583              : 
    1584              : #[cfg(test)]
    1585              : mod select_layer_tests {
    1586              :     use super::*;
    1587              : 
    1588          204 :     fn create_persistent_layer(
    1589          204 :         start_lsn: u64,
    1590          204 :         end_lsn: u64,
    1591          204 :         is_delta: bool,
    1592          204 :     ) -> Arc<PersistentLayerDesc> {
    1593          204 :         if !is_delta {
    1594           96 :             assert_eq!(end_lsn, start_lsn + 1);
    1595          108 :         }
    1596              : 
    1597          204 :         Arc::new(PersistentLayerDesc::new_test(
    1598          204 :             Key::MIN..Key::MAX,
    1599          204 :             Lsn(start_lsn)..Lsn(end_lsn),
    1600          204 :             is_delta,
    1601          204 :         ))
    1602          204 :     }
    1603              : 
    1604           72 :     fn create_inmem_layer(start_lsn: u64, end_lsn: u64) -> InMemoryLayerDesc {
    1605           72 :         InMemoryLayerDesc {
    1606           72 :             handle: InMemoryLayerHandle::Open,
    1607           72 :             lsn_range: Lsn(start_lsn)..Lsn(end_lsn),
    1608           72 :         }
    1609           72 :     }
    1610              : 
    1611              :     #[test]
    1612           12 :     fn test_select_layer_empty() {
    1613           12 :         assert!(LayerMap::select_layer(None, None, None, Lsn(100)).is_none());
    1614           12 :     }
    1615              : 
    1616              :     #[test]
    1617           12 :     fn test_select_layer_only_delta() {
    1618           12 :         let delta = create_persistent_layer(10, 20, true);
    1619           12 :         let result = LayerMap::select_layer(Some(delta.clone()), None, None, Lsn(100)).unwrap();
    1620           12 : 
    1621           12 :         assert_eq!(result.lsn_floor, Lsn(10));
    1622           12 :         assert!(
    1623           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1624              :         );
    1625           12 :     }
    1626              : 
    1627              :     #[test]
    1628           12 :     fn test_select_layer_only_image() {
    1629           12 :         let image = create_persistent_layer(10, 11, false);
    1630           12 :         let result = LayerMap::select_layer(None, Some(image.clone()), None, Lsn(100)).unwrap();
    1631           12 : 
    1632           12 :         assert_eq!(result.lsn_floor, Lsn(10));
    1633           12 :         assert!(
    1634           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1635              :         );
    1636           12 :     }
    1637              : 
    1638              :     #[test]
    1639           12 :     fn test_select_layer_only_inmem() {
    1640           12 :         let inmem = create_inmem_layer(10, 20);
    1641           12 :         let result = LayerMap::select_layer(None, None, Some(inmem.clone()), Lsn(100)).unwrap();
    1642           12 : 
    1643           12 :         assert_eq!(result.lsn_floor, Lsn(10));
    1644           12 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1645           12 :     }
    1646              : 
    1647              :     #[test]
    1648           12 :     fn test_select_layer_image_inside_delta() {
    1649           12 :         let delta = create_persistent_layer(10, 20, true);
    1650           12 :         let image = create_persistent_layer(15, 16, false);
    1651           12 : 
    1652           12 :         let result =
    1653           12 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(100))
    1654           12 :                 .unwrap();
    1655           12 : 
    1656           12 :         assert_eq!(result.lsn_floor, Lsn(16));
    1657           12 :         assert!(
    1658           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1659              :         );
    1660              : 
    1661           12 :         let result = LayerMap::select_layer(
    1662           12 :             Some(delta.clone()),
    1663           12 :             Some(image.clone()),
    1664           12 :             None,
    1665           12 :             result.lsn_floor,
    1666           12 :         )
    1667           12 :         .unwrap();
    1668           12 : 
    1669           12 :         assert_eq!(result.lsn_floor, Lsn(15));
    1670           12 :         assert!(
    1671           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1672              :         );
    1673           12 :     }
    1674              : 
    1675              :     #[test]
    1676           12 :     fn test_select_layer_newer_image() {
    1677           12 :         let delta = create_persistent_layer(10, 20, true);
    1678           12 :         let image = create_persistent_layer(25, 26, false);
    1679           12 : 
    1680           12 :         let result =
    1681           12 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1682           12 :                 .unwrap();
    1683           12 : 
    1684           12 :         assert_eq!(result.lsn_floor, Lsn(25));
    1685           12 :         assert!(
    1686           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1687              :         );
    1688              : 
    1689           12 :         let result =
    1690           12 :             LayerMap::select_layer(Some(delta.clone()), None, None, result.lsn_floor).unwrap();
    1691           12 : 
    1692           12 :         assert_eq!(result.lsn_floor, Lsn(10));
    1693           12 :         assert!(
    1694           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1695              :         );
    1696           12 :     }
    1697              : 
    1698              :     #[test]
    1699           12 :     fn test_select_layer_delta_with_older_image() {
    1700           12 :         let delta = create_persistent_layer(15, 25, true);
    1701           12 :         let image = create_persistent_layer(10, 11, false);
    1702           12 : 
    1703           12 :         let result =
    1704           12 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1705           12 :                 .unwrap();
    1706           12 : 
    1707           12 :         assert_eq!(result.lsn_floor, Lsn(15));
    1708           12 :         assert!(
    1709           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1710              :         );
    1711              : 
    1712           12 :         let result =
    1713           12 :             LayerMap::select_layer(None, Some(image.clone()), None, result.lsn_floor).unwrap();
    1714           12 : 
    1715           12 :         assert_eq!(result.lsn_floor, Lsn(10));
    1716           12 :         assert!(
    1717           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1718              :         );
    1719           12 :     }
    1720              : 
    1721              :     #[test]
    1722           12 :     fn test_select_layer_image_inside_inmem() {
    1723           12 :         let image = create_persistent_layer(15, 16, false);
    1724           12 :         let inmem = create_inmem_layer(10, 25);
    1725           12 : 
    1726           12 :         let result =
    1727           12 :             LayerMap::select_layer(None, Some(image.clone()), Some(inmem.clone()), Lsn(30))
    1728           12 :                 .unwrap();
    1729           12 : 
    1730           12 :         assert_eq!(result.lsn_floor, Lsn(16));
    1731           12 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1732              : 
    1733           12 :         let result = LayerMap::select_layer(
    1734           12 :             None,
    1735           12 :             Some(image.clone()),
    1736           12 :             Some(inmem.clone()),
    1737           12 :             result.lsn_floor,
    1738           12 :         )
    1739           12 :         .unwrap();
    1740           12 : 
    1741           12 :         assert_eq!(result.lsn_floor, Lsn(15));
    1742           12 :         assert!(
    1743           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1744              :         );
    1745              : 
    1746           12 :         let result =
    1747           12 :             LayerMap::select_layer(None, None, Some(inmem.clone()), result.lsn_floor).unwrap();
    1748           12 :         assert_eq!(result.lsn_floor, Lsn(10));
    1749           12 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1750           12 :     }
    1751              : 
    1752              :     #[test]
    1753           12 :     fn test_select_layer_delta_inside_inmem() {
    1754           12 :         let delta_top = create_persistent_layer(15, 20, true);
    1755           12 :         let delta_bottom = create_persistent_layer(10, 15, true);
    1756           12 :         let inmem = create_inmem_layer(15, 25);
    1757           12 : 
    1758           12 :         let result =
    1759           12 :             LayerMap::select_layer(Some(delta_top.clone()), None, Some(inmem.clone()), Lsn(30))
    1760           12 :                 .unwrap();
    1761           12 : 
    1762           12 :         assert_eq!(result.lsn_floor, Lsn(20));
    1763           12 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1764              : 
    1765           12 :         let result = LayerMap::select_layer(
    1766           12 :             Some(delta_top.clone()),
    1767           12 :             None,
    1768           12 :             Some(inmem.clone()),
    1769           12 :             result.lsn_floor,
    1770           12 :         )
    1771           12 :         .unwrap();
    1772           12 : 
    1773           12 :         assert_eq!(result.lsn_floor, Lsn(15));
    1774           12 :         assert!(
    1775           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_top))
    1776              :         );
    1777              : 
    1778           12 :         let result = LayerMap::select_layer(
    1779           12 :             Some(delta_bottom.clone()),
    1780           12 :             None,
    1781           12 :             Some(inmem.clone()),
    1782           12 :             result.lsn_floor,
    1783           12 :         )
    1784           12 :         .unwrap();
    1785           12 :         assert_eq!(result.lsn_floor, Lsn(10));
    1786           12 :         assert!(
    1787           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_bottom))
    1788              :         );
    1789           12 :     }
    1790              : 
    1791              :     #[test]
    1792           12 :     fn test_select_layer_all_overlap_1() {
    1793           12 :         let inmem = create_inmem_layer(10, 30);
    1794           12 :         let delta = create_persistent_layer(15, 25, true);
    1795           12 :         let image = create_persistent_layer(20, 21, false);
    1796           12 : 
    1797           12 :         let result = LayerMap::select_layer(
    1798           12 :             Some(delta.clone()),
    1799           12 :             Some(image.clone()),
    1800           12 :             Some(inmem.clone()),
    1801           12 :             Lsn(50),
    1802           12 :         )
    1803           12 :         .unwrap();
    1804           12 : 
    1805           12 :         assert_eq!(result.lsn_floor, Lsn(25));
    1806           12 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1807              : 
    1808           12 :         let result = LayerMap::select_layer(
    1809           12 :             Some(delta.clone()),
    1810           12 :             Some(image.clone()),
    1811           12 :             Some(inmem.clone()),
    1812           12 :             result.lsn_floor,
    1813           12 :         )
    1814           12 :         .unwrap();
    1815           12 : 
    1816           12 :         assert_eq!(result.lsn_floor, Lsn(21));
    1817           12 :         assert!(
    1818           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1819              :         );
    1820              : 
    1821           12 :         let result = LayerMap::select_layer(
    1822           12 :             Some(delta.clone()),
    1823           12 :             Some(image.clone()),
    1824           12 :             Some(inmem.clone()),
    1825           12 :             result.lsn_floor,
    1826           12 :         )
    1827           12 :         .unwrap();
    1828           12 : 
    1829           12 :         assert_eq!(result.lsn_floor, Lsn(20));
    1830           12 :         assert!(
    1831           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1832              :         );
    1833           12 :     }
    1834              : 
    1835              :     #[test]
    1836           12 :     fn test_select_layer_all_overlap_2() {
    1837           12 :         let inmem = create_inmem_layer(20, 30);
    1838           12 :         let delta = create_persistent_layer(10, 40, true);
    1839           12 :         let image = create_persistent_layer(25, 26, false);
    1840           12 : 
    1841           12 :         let result = LayerMap::select_layer(
    1842           12 :             Some(delta.clone()),
    1843           12 :             Some(image.clone()),
    1844           12 :             Some(inmem.clone()),
    1845           12 :             Lsn(50),
    1846           12 :         )
    1847           12 :         .unwrap();
    1848           12 : 
    1849           12 :         assert_eq!(result.lsn_floor, Lsn(26));
    1850           12 :         assert!(
    1851           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1852              :         );
    1853              : 
    1854           12 :         let result = LayerMap::select_layer(
    1855           12 :             Some(delta.clone()),
    1856           12 :             Some(image.clone()),
    1857           12 :             Some(inmem.clone()),
    1858           12 :             result.lsn_floor,
    1859           12 :         )
    1860           12 :         .unwrap();
    1861           12 : 
    1862           12 :         assert_eq!(result.lsn_floor, Lsn(25));
    1863           12 :         assert!(
    1864           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1865              :         );
    1866           12 :     }
    1867              : 
    1868              :     #[test]
    1869           12 :     fn test_select_layer_all_overlap_3() {
    1870           12 :         let inmem = create_inmem_layer(30, 40);
    1871           12 :         let delta = create_persistent_layer(10, 30, true);
    1872           12 :         let image = create_persistent_layer(20, 21, false);
    1873           12 : 
    1874           12 :         let result = LayerMap::select_layer(
    1875           12 :             Some(delta.clone()),
    1876           12 :             Some(image.clone()),
    1877           12 :             Some(inmem.clone()),
    1878           12 :             Lsn(50),
    1879           12 :         )
    1880           12 :         .unwrap();
    1881           12 : 
    1882           12 :         assert_eq!(result.lsn_floor, Lsn(30));
    1883           12 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1884              : 
    1885           12 :         let result = LayerMap::select_layer(
    1886           12 :             Some(delta.clone()),
    1887           12 :             Some(image.clone()),
    1888           12 :             None,
    1889           12 :             result.lsn_floor,
    1890           12 :         )
    1891           12 :         .unwrap();
    1892           12 : 
    1893           12 :         assert_eq!(result.lsn_floor, Lsn(21));
    1894           12 :         assert!(
    1895           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1896              :         );
    1897              : 
    1898           12 :         let result = LayerMap::select_layer(
    1899           12 :             Some(delta.clone()),
    1900           12 :             Some(image.clone()),
    1901           12 :             None,
    1902           12 :             result.lsn_floor,
    1903           12 :         )
    1904           12 :         .unwrap();
    1905           12 : 
    1906           12 :         assert_eq!(result.lsn_floor, Lsn(20));
    1907           12 :         assert!(
    1908           12 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1909              :         );
    1910           12 :     }
    1911              : }
        

Generated by: LCOV version 2.1-beta