LCOV - code coverage report
Current view: top level - pageserver/src/tenant - layer_map.rs (source / functions) Coverage Total Hit
Test: 17080b14f46954d6812ea0a7dad4b2247e0840a8.info Lines: 91.9 % 1101 1012
Test Date: 2025-07-08 18:30:10 Functions: 95.2 % 84 80

            Line data    Source code
       1              : //!
       2              : //! The layer map tracks what layers exist in a timeline.
       3              : //!
       4              : //! When the timeline is first accessed, the server lists of all layer files
       5              : //! in the timelines/<timeline_id> directory, and populates this map with
       6              : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
       7              : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
       8              : //! records. Now and then, in the checkpoint() function, the in-memory layer is
       9              : //! are frozen, and it is split up into new image and delta layers and the
      10              : //! corresponding files are written to disk.
      11              : //!
      12              : //! Design overview:
      13              : //!
      14              : //! The `search` method of the layer map is on the read critical path, so we've
      15              : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
      16              : //! Other read methods are less critical but still impact performance of background tasks.
      17              : //!
      18              : //! This data structure relies on a persistent/immutable binary search tree. See the
      19              : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
      20              : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
      21              : //! you to modify the tree in such a way that each modification creates a new "version"
      22              : //! of the tree. When you modify it, you get a new version, but all previous versions are
      23              : //! still accessible too. So if someone is still holding a reference to an older version,
      24              : //! they continue to see the tree as it was then. The persistent BST stores all the
      25              : //! different versions in an efficient way.
      26              : //!
      27              : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
      28              : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
      29              : //! to handle the LSN dimension.
      30              : //!
      31              : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
      32              : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
      33              : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
      34              : //! `historic_layer_coverage.rs`.
      35              : //!
      36              : //! To search for a particular key-LSN pair, you first look up the right "version" in the
      37              : //! BTreeMap. Then you search that version of the BST with the key.
      38              : //!
      39              : //! The persistent BST keeps all the versions, but there is no way to change the old versions
      40              : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
      41              : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
      42              : //! to throw away most of the persistent BST and build a new one, starting from the oldest
      43              : //! LSN. See [`LayerMap::flush_updates()`].
      44              : //!
      45              : 
      46              : mod historic_layer_coverage;
      47              : mod layer_coverage;
      48              : 
      49              : use std::collections::{HashMap, VecDeque};
      50              : use std::iter::Peekable;
      51              : use std::ops::Range;
      52              : use std::sync::Arc;
      53              : 
      54              : use anyhow::Result;
      55              : use historic_layer_coverage::BufferedHistoricLayerCoverage;
      56              : pub use historic_layer_coverage::LayerKey;
      57              : use pageserver_api::key::Key;
      58              : use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
      59              : use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
      60              : use tokio::sync::watch;
      61              : use utils::lsn::Lsn;
      62              : 
      63              : use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
      64              : use crate::context::RequestContext;
      65              : use crate::tenant::storage_layer::{InMemoryLayer, ReadableLayerWeak};
      66              : 
      67              : ///
      68              : /// LayerMap tracks what layers exist on a timeline.
      69              : ///
      70              : pub struct LayerMap {
      71              :     //
      72              :     // 'open_layer' holds the current InMemoryLayer that is accepting new
      73              :     // records. If it is None, 'next_open_layer_at' will be set instead, indicating
      74              :     // where the start LSN of the next InMemoryLayer that is to be created.
      75              :     //
      76              :     pub open_layer: Option<Arc<InMemoryLayer>>,
      77              :     pub next_open_layer_at: Option<Lsn>,
      78              : 
      79              :     ///
      80              :     /// Frozen layers, if any. Frozen layers are in-memory layers that
      81              :     /// are no longer added to, but haven't been written out to disk
      82              :     /// yet. They contain WAL older than the current 'open_layer' or
      83              :     /// 'next_open_layer_at', but newer than any historic layer.
      84              :     /// The frozen layers are in order from oldest to newest, so that
      85              :     /// the newest one is in the 'back' of the VecDeque, and the oldest
      86              :     /// in the 'front'.
      87              :     ///
      88              :     pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
      89              : 
      90              :     /// Index of the historic layers optimized for search
      91              :     historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
      92              : 
      93              :     /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
      94              :     /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
      95              :     ///
      96              :     /// NB: make sure to notify `watch_l0_deltas` on changes.
      97              :     l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
      98              : 
      99              :     /// Notifies about L0 delta layer changes, sending the current number of L0 layers.
     100              :     watch_l0_deltas: watch::Sender<usize>,
     101              : }
     102              : 
     103              : impl Default for LayerMap {
     104          238 :     fn default() -> Self {
     105          238 :         Self {
     106          238 :             open_layer: Default::default(),
     107          238 :             next_open_layer_at: Default::default(),
     108          238 :             frozen_layers: Default::default(),
     109          238 :             historic: Default::default(),
     110          238 :             l0_delta_layers: Default::default(),
     111          238 :             watch_l0_deltas: watch::channel(0).0,
     112          238 :         }
     113          238 :     }
     114              : }
     115              : 
     116              : /// The primary update API for the layer map.
     117              : ///
     118              : /// Batching historic layer insertions and removals is good for
     119              : /// performance and this struct helps us do that correctly.
     120              : #[must_use]
     121              : pub struct BatchedUpdates<'a> {
     122              :     // While we hold this exclusive reference to the layer map the type checker
     123              :     // will prevent us from accidentally reading any unflushed updates.
     124              :     layer_map: &'a mut LayerMap,
     125              : }
     126              : 
     127              : /// Provide ability to batch more updates while hiding the read
     128              : /// API so we don't accidentally read without flushing.
     129              : impl BatchedUpdates<'_> {
     130              :     ///
     131              :     /// Insert an on-disk layer.
     132              :     ///
     133              :     // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
     134         2475 :     pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
     135         2475 :         self.layer_map.insert_historic_noflush(layer_desc)
     136         2475 :     }
     137              : 
     138              :     ///
     139              :     /// Remove an on-disk layer from the map.
     140              :     ///
     141              :     /// This should be called when the corresponding file on disk has been deleted.
     142              :     ///
     143          261 :     pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
     144          261 :         self.layer_map.remove_historic_noflush(layer_desc)
     145          261 :     }
     146              : 
     147              :     // We will flush on drop anyway, but this method makes it
     148              :     // more explicit that there is some work being done.
     149              :     /// Apply all updates
     150          819 :     pub fn flush(self) {
     151              :         // Flush happens on drop
     152          819 :     }
     153              : }
     154              : 
     155              : // Ideally the flush() method should be called explicitly for more
     156              : // controlled execution. But if we forget we'd rather flush on drop
     157              : // than panic later or read without flushing.
     158              : //
     159              : // TODO maybe warn if flush hasn't explicitly been called
     160              : impl Drop for BatchedUpdates<'_> {
     161          819 :     fn drop(&mut self) {
     162          819 :         self.layer_map.flush_updates();
     163          819 :     }
     164              : }
     165              : 
     166              : /// Return value of LayerMap::search
     167              : #[derive(Eq, PartialEq, Debug, Hash)]
     168              : pub struct SearchResult {
     169              :     pub layer: ReadableLayerWeak,
     170              :     pub lsn_floor: Lsn,
     171              : }
     172              : 
     173              : /// Return value of [`LayerMap::range_search`]
     174              : ///
     175              : /// Contains a mapping from a layer description to a keyspace
     176              : /// accumulator that contains all the keys which intersect the layer
     177              : /// from the original search space.
     178              : #[derive(Debug)]
     179              : pub struct RangeSearchResult {
     180              :     pub found: HashMap<SearchResult, KeySpaceAccum>,
     181              : }
     182              : 
     183              : impl RangeSearchResult {
     184       526937 :     fn new() -> Self {
     185       526937 :         Self {
     186       526937 :             found: HashMap::new(),
     187       526937 :         }
     188       526937 :     }
     189              : 
     190       167495 :     fn map_to_in_memory_layer(
     191       167495 :         in_memory_layer: Option<InMemoryLayerDesc>,
     192       167495 :         range: Range<Key>,
     193       167495 :     ) -> RangeSearchResult {
     194       167495 :         match in_memory_layer {
     195        54685 :             Some(inmem) => {
     196        54685 :                 let search_result = SearchResult {
     197        54685 :                     lsn_floor: inmem.get_lsn_range().start,
     198        54685 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     199        54685 :                 };
     200              : 
     201        54685 :                 let mut accum = KeySpaceAccum::new();
     202        54685 :                 accum.add_range(range);
     203        54685 :                 RangeSearchResult {
     204        54685 :                     found: HashMap::from([(search_result, accum)]),
     205        54685 :                 }
     206              :             }
     207       112810 :             None => RangeSearchResult::new(),
     208              :         }
     209       167495 :     }
     210              : }
     211              : 
     212              : /// Collector for results of range search queries on the LayerMap.
     213              : /// It should be provided with two iterators for the delta and image coverage
     214              : /// that contain all the changes for layers which intersect the range.
     215              : struct RangeSearchCollector<Iter>
     216              : where
     217              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     218              : {
     219              :     in_memory_layer: Option<InMemoryLayerDesc>,
     220              :     delta_coverage: Peekable<Iter>,
     221              :     image_coverage: Peekable<Iter>,
     222              :     key_range: Range<Key>,
     223              :     end_lsn: Lsn,
     224              : 
     225              :     current_delta: Option<Arc<PersistentLayerDesc>>,
     226              :     current_image: Option<Arc<PersistentLayerDesc>>,
     227              : 
     228              :     result: RangeSearchResult,
     229              : }
     230              : 
     231              : #[derive(Debug)]
     232              : enum NextLayerType {
     233              :     Delta(i128),
     234              :     Image(i128),
     235              :     Both(i128),
     236              : }
     237              : 
     238              : impl NextLayerType {
     239       822372 :     fn next_change_at_key(&self) -> Key {
     240       822372 :         match self {
     241        78943 :             NextLayerType::Delta(at) => Key::from_i128(*at),
     242       317601 :             NextLayerType::Image(at) => Key::from_i128(*at),
     243       425828 :             NextLayerType::Both(at) => Key::from_i128(*at),
     244              :         }
     245       822372 :     }
     246              : }
     247              : 
     248              : impl<Iter> RangeSearchCollector<Iter>
     249              : where
     250              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     251              : {
     252       410586 :     fn new(
     253       410586 :         key_range: Range<Key>,
     254       410586 :         end_lsn: Lsn,
     255       410586 :         in_memory_layer: Option<InMemoryLayerDesc>,
     256       410586 :         delta_coverage: Iter,
     257       410586 :         image_coverage: Iter,
     258       410586 :     ) -> Self {
     259       410586 :         Self {
     260       410586 :             in_memory_layer,
     261       410586 :             delta_coverage: delta_coverage.peekable(),
     262       410586 :             image_coverage: image_coverage.peekable(),
     263       410586 :             key_range,
     264       410586 :             end_lsn,
     265       410586 :             current_delta: None,
     266       410586 :             current_image: None,
     267       410586 :             result: RangeSearchResult::new(),
     268       410586 :         }
     269       410586 :     }
     270              : 
     271              :     /// Run the collector. Collection is implemented via a two pointer algorithm.
     272              :     /// One pointer tracks the start of the current range and the other tracks
     273              :     /// the beginning of the next range which will overlap with the next change
     274              :     /// in coverage across both image and delta.
     275       410586 :     fn collect(mut self) -> RangeSearchResult {
     276       410586 :         let next_layer_type = self.choose_next_layer_type();
     277       407351 :         let mut current_range_start = match next_layer_type {
     278              :             None => {
     279              :                 // No changes for the range
     280         3235 :                 self.pad_range(self.key_range.clone());
     281         3235 :                 return self.result;
     282              :             }
     283       407351 :             Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
     284              :                 // Changes only after the end of the range
     285            0 :                 self.pad_range(self.key_range.clone());
     286            0 :                 return self.result;
     287              :             }
     288       407351 :             Some(layer_type) => {
     289              :                 // Changes for the range exist.
     290       407351 :                 let coverage_start = layer_type.next_change_at_key();
     291       407351 :                 let range_before = self.key_range.start..coverage_start;
     292       407351 :                 self.pad_range(range_before);
     293              : 
     294       407351 :                 self.advance(&layer_type);
     295       407351 :                 coverage_start
     296              :             }
     297              :         };
     298              : 
     299       822372 :         while current_range_start < self.key_range.end {
     300       415021 :             let next_layer_type = self.choose_next_layer_type();
     301       415021 :             match next_layer_type {
     302         7670 :                 Some(t) => {
     303         7670 :                     let current_range_end = t.next_change_at_key();
     304         7670 :                     self.add_range(current_range_start..current_range_end);
     305         7670 :                     current_range_start = current_range_end;
     306         7670 : 
     307         7670 :                     self.advance(&t);
     308         7670 :                 }
     309       407351 :                 None => {
     310       407351 :                     self.add_range(current_range_start..self.key_range.end);
     311       407351 :                     current_range_start = self.key_range.end;
     312       407351 :                 }
     313              :             }
     314              :         }
     315              : 
     316       407351 :         self.result
     317       410586 :     }
     318              : 
     319              :     /// Map a range which does not intersect any persistent layers to
     320              :     /// the in-memory layer candidate.
     321       411057 :     fn pad_range(&mut self, key_range: Range<Key>) {
     322       411057 :         if !key_range.is_empty() {
     323         4693 :             if let Some(ref inmem) = self.in_memory_layer {
     324         1589 :                 let search_result = SearchResult {
     325         1589 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem.clone()),
     326         1589 :                     lsn_floor: inmem.get_lsn_range().start,
     327         1589 :                 };
     328         1589 : 
     329         1589 :                 self.result
     330         1589 :                     .found
     331         1589 :                     .entry(search_result)
     332         1589 :                     .or_default()
     333         1589 :                     .add_range(key_range);
     334         3104 :             }
     335       406364 :         }
     336       411057 :     }
     337              : 
     338              :     /// Select the appropiate layer for the given range and update
     339              :     /// the collector.
     340       415021 :     fn add_range(&mut self, covered_range: Range<Key>) {
     341       415021 :         let selected = LayerMap::select_layer(
     342       415021 :             self.current_delta.clone(),
     343       415021 :             self.current_image.clone(),
     344       415021 :             self.in_memory_layer.clone(),
     345       415021 :             self.end_lsn,
     346              :         );
     347              : 
     348       415021 :         match selected {
     349       414550 :             Some(search_result) => self
     350       414550 :                 .result
     351       414550 :                 .found
     352       414550 :                 .entry(search_result)
     353       414550 :                 .or_default()
     354       414550 :                 .add_range(covered_range),
     355          471 :             None => self.pad_range(covered_range),
     356              :         }
     357       415021 :     }
     358              : 
     359              :     /// Move to the next coverage change.
     360       415021 :     fn advance(&mut self, layer_type: &NextLayerType) {
     361       415021 :         match layer_type {
     362        40476 :             NextLayerType::Delta(_) => {
     363        40476 :                 let (_, layer) = self.delta_coverage.next().unwrap();
     364        40476 :                 self.current_delta = layer;
     365        40476 :             }
     366       159370 :             NextLayerType::Image(_) => {
     367       159370 :                 let (_, layer) = self.image_coverage.next().unwrap();
     368       159370 :                 self.current_image = layer;
     369       159370 :             }
     370       215175 :             NextLayerType::Both(_) => {
     371       215175 :                 let (_, image_layer) = self.image_coverage.next().unwrap();
     372       215175 :                 let (_, delta_layer) = self.delta_coverage.next().unwrap();
     373       215175 : 
     374       215175 :                 self.current_image = image_layer;
     375       215175 :                 self.current_delta = delta_layer;
     376       215175 :             }
     377              :         }
     378       415021 :     }
     379              : 
     380              :     /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
     381       825607 :     fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
     382       825607 :         let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
     383       825607 :         let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
     384              : 
     385       825607 :         match (next_delta_at, next_image_at) {
     386       410586 :             (None, None) => None,
     387        37955 :             (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
     388       158934 :             (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
     389       218132 :             (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
     390          436 :                 Some(NextLayerType::Image(*next_image_at))
     391              :             }
     392       217696 :             (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
     393         2521 :                 Some(NextLayerType::Delta(*next_delta_at))
     394              :             }
     395       215175 :             (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
     396              :         }
     397       825607 :     }
     398              : }
     399              : 
     400              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     401              : pub struct InMemoryLayerDesc {
     402              :     handle: InMemoryLayerHandle,
     403              :     lsn_range: Range<Lsn>,
     404              : }
     405              : 
     406              : impl InMemoryLayerDesc {
     407       644590 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     408       644590 :         self.lsn_range.clone()
     409       644590 :     }
     410              : }
     411              : 
     412              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     413              : enum InMemoryLayerHandle {
     414              :     Open,
     415              :     Frozen(usize),
     416              : }
     417              : 
     418              : impl LayerMap {
     419              :     ///
     420              :     /// Find the latest layer (by lsn.end) that covers the given
     421              :     /// 'key', with lsn.start < 'end_lsn'.
     422              :     ///
     423              :     /// The caller of this function is the page reconstruction
     424              :     /// algorithm looking for the next relevant delta layer, or
     425              :     /// the terminal image layer. The caller will pass the lsn_floor
     426              :     /// value as end_lsn in the next call to search.
     427              :     ///
     428              :     /// If there's an image layer exactly below the given end_lsn,
     429              :     /// search should return that layer regardless if there are
     430              :     /// overlapping deltas.
     431              :     ///
     432              :     /// If the latest layer is a delta and there is an overlapping
     433              :     /// image with it below, the lsn_floor returned should be right
     434              :     /// above that image so we don't skip it in the search. Otherwise
     435              :     /// the lsn_floor returned should be the bottom of the delta layer
     436              :     /// because we should make as much progress down the lsn axis
     437              :     /// as possible. It's fine if this way we skip some overlapping
     438              :     /// deltas, because the delta we returned would contain the same
     439              :     /// wal content.
     440              :     ///
     441              :     /// TODO: This API is convoluted and inefficient. If the caller
     442              :     /// makes N search calls, we'll end up finding the same latest
     443              :     /// image layer N times. We should either cache the latest image
     444              :     /// layer result, or simplify the api to `get_latest_image` and
     445              :     /// `get_latest_delta`, and only call `get_latest_image` once.
     446              :     ///
     447        71981 :     pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
     448        71981 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     449              : 
     450        71981 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     451        71981 :             Some(version) => version,
     452              :             None => {
     453            0 :                 return in_memory_layer.map(|desc| SearchResult {
     454            0 :                     lsn_floor: desc.get_lsn_range().start,
     455            0 :                     layer: ReadableLayerWeak::InMemoryLayer(desc),
     456            0 :                 });
     457              :             }
     458              :         };
     459              : 
     460        71981 :         let latest_delta = version.delta_coverage.query(key.to_i128());
     461        71981 :         let latest_image = version.image_coverage.query(key.to_i128());
     462              : 
     463        71981 :         Self::select_layer(latest_delta, latest_image, in_memory_layer, end_lsn)
     464        71981 :     }
     465              : 
     466              :     /// Select a layer from three potential candidates (in-memory, delta and image layer).
     467              :     /// The candidates represent the first layer of each type which intersect a key range.
     468              :     ///
     469              :     /// Layer types have an in implicit priority (image > delta > in-memory). For instance,
     470              :     /// if we have the option of reading an LSN range from both an image and a delta, we
     471              :     /// should read from the image.
     472       732792 :     fn select_layer(
     473       732792 :         delta_layer: Option<Arc<PersistentLayerDesc>>,
     474       732792 :         image_layer: Option<Arc<PersistentLayerDesc>>,
     475       732792 :         in_memory_layer: Option<InMemoryLayerDesc>,
     476       732792 :         end_lsn: Lsn,
     477       732792 :     ) -> Option<SearchResult> {
     478       732792 :         assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta()));
     479       732792 :         assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta()));
     480              : 
     481       732792 :         match (delta_layer, image_layer, in_memory_layer) {
     482         5802 :             (None, None, None) => None,
     483        28867 :             (None, Some(image), None) => {
     484        28867 :                 let lsn_floor = image.get_lsn_range().start;
     485        28867 :                 Some(SearchResult {
     486        28867 :                     layer: ReadableLayerWeak::PersistentLayer(image),
     487        28867 :                     lsn_floor,
     488        28867 :                 })
     489              :             }
     490        39586 :             (Some(delta), None, None) => {
     491        39586 :                 let lsn_floor = delta.get_lsn_range().start;
     492        39586 :                 Some(SearchResult {
     493        39586 :                     layer: ReadableLayerWeak::PersistentLayer(delta),
     494        39586 :                     lsn_floor,
     495        39586 :                 })
     496              :             }
     497       238380 :             (Some(delta), Some(image), None) => {
     498       238380 :                 let img_lsn = image.get_lsn_range().start;
     499       238380 :                 let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
     500       238380 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     501       238380 :                 if image_is_newer || image_exact_match {
     502        21314 :                     Some(SearchResult {
     503        21314 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     504        21314 :                         lsn_floor: img_lsn,
     505        21314 :                     })
     506              :                 } else {
     507              :                     // If the delta overlaps with the image in the LSN dimension, do a partial
     508              :                     // up to the image layer.
     509       217066 :                     let lsn_floor =
     510       217066 :                         std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
     511       217066 :                     Some(SearchResult {
     512       217066 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     513       217066 :                         lsn_floor,
     514       217066 :                     })
     515              :                 }
     516              :             }
     517         5791 :             (None, None, Some(inmem)) => {
     518         5791 :                 let lsn_floor = inmem.get_lsn_range().start;
     519         5791 :                 Some(SearchResult {
     520         5791 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     521         5791 :                     lsn_floor,
     522         5791 :                 })
     523              :             }
     524       169584 :             (None, Some(image), Some(inmem)) => {
     525              :                 // If the in-memory layer overlaps with the image in the LSN dimension, do a partial
     526              :                 // up to the image layer.
     527       169584 :                 let img_lsn = image.get_lsn_range().start;
     528       169584 :                 let image_is_newer = image.get_lsn_range().end >= inmem.get_lsn_range().end;
     529       169584 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     530       169584 :                 if image_is_newer || image_exact_match {
     531          437 :                     Some(SearchResult {
     532          437 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     533          437 :                         lsn_floor: img_lsn,
     534          437 :                     })
     535              :                 } else {
     536       169147 :                     let lsn_floor =
     537       169147 :                         std::cmp::max(inmem.get_lsn_range().start, image.get_lsn_range().start + 1);
     538       169147 :                     Some(SearchResult {
     539       169147 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     540       169147 :                         lsn_floor,
     541       169147 :                     })
     542              :                 }
     543              :             }
     544       121899 :             (Some(delta), None, Some(inmem)) => {
     545              :                 // Overlaps between delta and in-memory layers are not a valid
     546              :                 // state, but we handle them here for completeness.
     547       121899 :                 let delta_end = delta.get_lsn_range().end;
     548       121899 :                 let delta_is_newer = delta_end >= inmem.get_lsn_range().end;
     549       121899 :                 let delta_exact_match = delta_end == end_lsn;
     550       121899 :                 if delta_is_newer || delta_exact_match {
     551            4 :                     Some(SearchResult {
     552            4 :                         lsn_floor: delta.get_lsn_range().start,
     553            4 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     554            4 :                     })
     555              :                 } else {
     556              :                     // If the in-memory layer overlaps with the delta in the LSN dimension, do a partial
     557              :                     // up to the delta layer.
     558       121895 :                     let lsn_floor =
     559       121895 :                         std::cmp::max(inmem.get_lsn_range().start, delta.get_lsn_range().end);
     560       121895 :                     Some(SearchResult {
     561       121895 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     562       121895 :                         lsn_floor,
     563       121895 :                     })
     564              :                 }
     565              :             }
     566       122883 :             (Some(delta), Some(image), Some(inmem)) => {
     567              :                 // Determine the preferred persistent layer without taking the in-memory layer
     568              :                 // into consideration.
     569       122883 :                 let persistent_res =
     570       122883 :                     Self::select_layer(Some(delta.clone()), Some(image.clone()), None, end_lsn)
     571       122883 :                         .unwrap();
     572       122883 :                 let persistent_l = match persistent_res.layer {
     573       122883 :                     ReadableLayerWeak::PersistentLayer(l) => l,
     574            0 :                     ReadableLayerWeak::InMemoryLayer(_) => unreachable!(),
     575              :                 };
     576              : 
     577              :                 // Now handle the in-memory layer overlaps.
     578       122883 :                 let inmem_res = if persistent_l.is_delta() {
     579       116912 :                     Self::select_layer(Some(persistent_l), None, Some(inmem.clone()), end_lsn)
     580       116912 :                         .unwrap()
     581              :                 } else {
     582         5971 :                     Self::select_layer(None, Some(persistent_l), Some(inmem.clone()), end_lsn)
     583         5971 :                         .unwrap()
     584              :                 };
     585              : 
     586       122883 :                 Some(SearchResult {
     587       122883 :                     layer: inmem_res.layer,
     588       122883 :                     // Use the more restrictive LSN floor
     589       122883 :                     lsn_floor: std::cmp::max(persistent_res.lsn_floor, inmem_res.lsn_floor),
     590       122883 :                 })
     591              :             }
     592              :         }
     593       732792 :     }
     594              : 
     595       578081 :     pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
     596       578081 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     597              : 
     598       578081 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     599       410586 :             Some(version) => version,
     600              :             None => {
     601       167495 :                 return RangeSearchResult::map_to_in_memory_layer(in_memory_layer, key_range);
     602              :             }
     603              :         };
     604              : 
     605       410586 :         let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
     606       410586 :         let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
     607       410586 :         let image_changes = version.image_coverage.range_overlaps(&raw_range);
     608              : 
     609       410586 :         let collector = RangeSearchCollector::new(
     610       410586 :             key_range,
     611       410586 :             end_lsn,
     612       410586 :             in_memory_layer,
     613       410586 :             delta_changes,
     614       410586 :             image_changes,
     615              :         );
     616       410586 :         collector.collect()
     617       578081 :     }
     618              : 
     619              :     /// Start a batch of updates, applied on drop
     620          819 :     pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
     621          819 :         BatchedUpdates { layer_map: self }
     622          819 :     }
     623              : 
     624              :     ///
     625              :     /// Insert an on-disk layer
     626              :     ///
     627              :     /// Helper function for BatchedUpdates::insert_historic
     628              :     ///
     629              :     /// TODO(chi): remove L generic so that we do not need to pass layer object.
     630         2480 :     pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
     631              :         // TODO: See #3869, resulting #4088, attempted fix and repro #4094
     632              : 
     633         2480 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     634          500 :             self.l0_delta_layers.push(layer_desc.clone().into());
     635          500 :             self.watch_l0_deltas
     636          500 :                 .send_replace(self.l0_delta_layers.len());
     637         1980 :         }
     638              : 
     639         2480 :         self.historic.insert(
     640         2480 :             historic_layer_coverage::LayerKey::from(&layer_desc),
     641         2480 :             layer_desc.into(),
     642              :         );
     643         2480 :     }
     644              : 
     645              :     ///
     646              :     /// Remove an on-disk layer from the map.
     647              :     ///
     648              :     /// Helper function for BatchedUpdates::remove_historic
     649              :     ///
     650          261 :     pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
     651          261 :         self.historic
     652          261 :             .remove(historic_layer_coverage::LayerKey::from(layer_desc));
     653          261 :         let layer_key = layer_desc.key();
     654          261 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     655          202 :             let len_before = self.l0_delta_layers.len();
     656          202 :             let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
     657         2717 :             l0_delta_layers.retain(|other| other.key() != layer_key);
     658          202 :             self.l0_delta_layers = l0_delta_layers;
     659          202 :             self.watch_l0_deltas
     660          202 :                 .send_replace(self.l0_delta_layers.len());
     661              :             // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
     662              :             // there's a chance that the comparison fails at runtime due to it comparing (pointer,
     663              :             // vtable) pairs.
     664          202 :             assert_eq!(
     665          202 :                 self.l0_delta_layers.len(),
     666          202 :                 len_before - 1,
     667            0 :                 "failed to locate removed historic layer from l0_delta_layers"
     668              :             );
     669           59 :         }
     670          261 :     }
     671              : 
     672              :     /// Helper function for BatchedUpdates::drop.
     673          820 :     pub(self) fn flush_updates(&mut self) {
     674          820 :         self.historic.rebuild();
     675          820 :     }
     676              : 
     677              :     /// Is there a newer image layer for given key- and LSN-range? Or a set
     678              :     /// of image layers within the specified lsn range that cover the entire
     679              :     /// specified key range?
     680              :     ///
     681              :     /// This is used for garbage collection, to determine if an old layer can
     682              :     /// be deleted.
     683         5825 :     pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
     684         5825 :         if key.is_empty() {
     685              :             // Vacuously true. There's a newer image for all 0 of the kerys in the range.
     686            0 :             return true;
     687         5825 :         }
     688              : 
     689         5825 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     690         5825 :             Some(v) => v,
     691            0 :             None => return false,
     692              :         };
     693              : 
     694         5825 :         let start = key.start.to_i128();
     695         5825 :         let end = key.end.to_i128();
     696              : 
     697         5898 :         let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
     698         5875 :             Some(layer) => layer.get_lsn_range().start >= lsn.start,
     699           23 :             None => false,
     700         5898 :         };
     701              : 
     702              :         // Check the start is covered
     703         5825 :         if !layer_covers(version.image_coverage.query(start)) {
     704         5798 :             return false;
     705           27 :         }
     706              : 
     707              :         // Check after all changes of coverage
     708           73 :         for (_, change_val) in version.image_coverage.range(start..end) {
     709           73 :             if !layer_covers(change_val) {
     710           23 :                 return false;
     711           50 :             }
     712              :         }
     713              : 
     714            4 :         true
     715         5825 :     }
     716              : 
     717         2212 :     pub fn iter_historic_layers(&self) -> impl ExactSizeIterator<Item = Arc<PersistentLayerDesc>> {
     718         2212 :         self.historic.iter()
     719         2212 :     }
     720              : 
     721              :     /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
     722       650062 :     pub(crate) fn search_in_memory_layer(&self, below: Lsn) -> Option<InMemoryLayerDesc> {
     723       650062 :         let is_below = |l: &Arc<InMemoryLayer>| {
     724       554430 :             let start_lsn = l.get_lsn_range().start;
     725       554430 :             below > start_lsn
     726       554430 :         };
     727              : 
     728       650062 :         if let Some(open) = &self.open_layer {
     729       452885 :             if is_below(open) {
     730       299510 :                 return Some(InMemoryLayerDesc {
     731       299510 :                     handle: InMemoryLayerHandle::Open,
     732       299510 :                     lsn_range: open.get_lsn_range(),
     733       299510 :                 });
     734       153375 :             }
     735       197177 :         }
     736              : 
     737       350552 :         self.frozen_layers
     738       350552 :             .iter()
     739       350552 :             .enumerate()
     740       350552 :             .rfind(|(_idx, l)| is_below(l))
     741       350552 :             .map(|(idx, l)| InMemoryLayerDesc {
     742        49683 :                 handle: InMemoryLayerHandle::Frozen(idx),
     743        49683 :                 lsn_range: l.get_lsn_range(),
     744        49683 :             })
     745       650062 :     }
     746              : 
     747       311045 :     pub(crate) fn in_memory_layer(&self, desc: &InMemoryLayerDesc) -> Arc<InMemoryLayer> {
     748       311045 :         match desc.handle {
     749       299510 :             InMemoryLayerHandle::Open => self.open_layer.as_ref().unwrap().clone(),
     750        11535 :             InMemoryLayerHandle::Frozen(idx) => self.frozen_layers[idx].clone(),
     751              :         }
     752       311045 :     }
     753              : 
     754              :     ///
     755              :     /// Divide the whole given range of keys into sub-ranges based on the latest
     756              :     /// image layer that covers each range at the specified lsn (inclusive).
     757              :     /// This is used when creating  new image layers.
     758            7 :     pub fn image_coverage(
     759            7 :         &self,
     760            7 :         key_range: &Range<Key>,
     761            7 :         lsn: Lsn,
     762            7 :     ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
     763            7 :         let version = match self.historic.get().unwrap().get_version(lsn.0) {
     764            7 :             Some(v) => v,
     765            0 :             None => return vec![],
     766              :         };
     767              : 
     768            7 :         let start = key_range.start.to_i128();
     769            7 :         let end = key_range.end.to_i128();
     770              : 
     771              :         // Initialize loop variables
     772            7 :         let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
     773            7 :         let mut current_key = start;
     774            7 :         let mut current_val = version.image_coverage.query(start);
     775              : 
     776              :         // Loop through the change events and push intervals
     777            7 :         for (change_key, change_val) in version.image_coverage.range(start..end) {
     778            0 :             let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     779            0 :             coverage.push((kr, current_val.take()));
     780            0 :             current_key = change_key;
     781            0 :             current_val.clone_from(&change_val);
     782            0 :         }
     783              : 
     784              :         // Add the final interval
     785            7 :         let kr = Key::from_i128(current_key)..Key::from_i128(end);
     786            7 :         coverage.push((kr, current_val.take()));
     787              : 
     788            7 :         coverage
     789            7 :     }
     790              : 
     791              :     /// Check if the key range resembles that of an L0 layer.
     792         4346 :     pub fn is_l0(key_range: &Range<Key>, is_delta_layer: bool) -> bool {
     793         4346 :         is_delta_layer && key_range == &(Key::MIN..Key::MAX)
     794         4346 :     }
     795              : 
     796              :     /// This function determines which layers are counted in `count_deltas`:
     797              :     /// layers that should count towards deciding whether or not to reimage
     798              :     /// a certain partition range.
     799              :     ///
     800              :     /// There are two kinds of layers we currently consider reimage-worthy:
     801              :     ///
     802              :     /// Case 1: Non-L0 layers are currently reimage-worthy by default.
     803              :     /// TODO Some of these layers are very sparse and cover the entire key
     804              :     ///      range. Replacing 256MB of data (or less!) with terabytes of
     805              :     ///      images doesn't seem wise. We need a better heuristic, possibly
     806              :     ///      based on some of these factors:
     807              :     ///      a) whether this layer has any wal in this partition range
     808              :     ///      b) the size of the layer
     809              :     ///      c) the number of images needed to cover it
     810              :     ///      d) the estimated time until we'll have to reimage over it for GC
     811              :     ///
     812              :     /// Case 2: Since L0 layers by definition cover the entire key space, we consider
     813              :     /// them reimage-worthy only when the entire key space can be covered by very few
     814              :     /// images (currently 1).
     815              :     /// TODO The optimal number should probably be slightly higher than 1, but to
     816              :     ///      implement that we need to plumb a lot more context into this function
     817              :     ///      than just the current partition_range.
     818            0 :     pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
     819              :         // Case 1
     820            0 :         if !Self::is_l0(&layer.key_range, layer.is_delta) {
     821            0 :             return true;
     822            0 :         }
     823              : 
     824              :         // Case 2
     825            0 :         if partition_range == &(Key::MIN..Key::MAX) {
     826            0 :             return true;
     827            0 :         }
     828              : 
     829            0 :         false
     830            0 :     }
     831              : 
     832              :     /// Count the height of the tallest stack of reimage-worthy deltas
     833              :     /// in this 2d region.
     834              :     ///
     835              :     /// If `limit` is provided we don't try to count above that number.
     836              :     ///
     837              :     /// This number is used to compute the largest number of deltas that
     838              :     /// we'll need to visit for any page reconstruction in this region.
     839              :     /// We use this heuristic to decide whether to create an image layer.
     840            7 :     pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
     841              :         // We get the delta coverage of the region, and for each part of the coverage
     842              :         // we recurse right underneath the delta. The recursion depth is limited by
     843              :         // the largest result this function could return, which is in practice between
     844              :         // 3 and 10 (since we usually try to create an image when the number gets larger).
     845              : 
     846            7 :         if lsn.is_empty() || key.is_empty() || limit == Some(0) {
     847            0 :             return 0;
     848            7 :         }
     849              : 
     850            7 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     851            7 :             Some(v) => v,
     852            0 :             None => return 0,
     853              :         };
     854              : 
     855            7 :         let start = key.start.to_i128();
     856            7 :         let end = key.end.to_i128();
     857              : 
     858              :         // Initialize loop variables
     859            7 :         let mut max_stacked_deltas = 0;
     860            7 :         let mut current_key = start;
     861            7 :         let mut current_val = version.delta_coverage.query(start);
     862              : 
     863              :         // Loop through the delta coverage and recurse on each part
     864            7 :         for (change_key, change_val) in version.delta_coverage.range(start..end) {
     865              :             // If there's a relevant delta in this part, add 1 and recurse down
     866            0 :             if let Some(val) = &current_val {
     867            0 :                 if val.get_lsn_range().end > lsn.start {
     868            0 :                     let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     869            0 :                     let lr = lsn.start..val.get_lsn_range().start;
     870            0 :                     if !kr.is_empty() {
     871            0 :                         let base_count = Self::is_reimage_worthy(val, key) as usize;
     872            0 :                         let new_limit = limit.map(|l| l - base_count);
     873            0 :                         let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     874            0 :                         max_stacked_deltas = std::cmp::max(
     875            0 :                             max_stacked_deltas,
     876            0 :                             base_count + max_stacked_deltas_underneath,
     877            0 :                         );
     878            0 :                     }
     879            0 :                 }
     880            0 :             }
     881              : 
     882            0 :             current_key = change_key;
     883            0 :             current_val.clone_from(&change_val);
     884              :         }
     885              : 
     886              :         // Consider the last part
     887            7 :         if let Some(val) = &current_val {
     888            0 :             if val.get_lsn_range().end > lsn.start {
     889            0 :                 let kr = Key::from_i128(current_key)..Key::from_i128(end);
     890            0 :                 let lr = lsn.start..val.get_lsn_range().start;
     891              : 
     892            0 :                 if !kr.is_empty() {
     893            0 :                     let base_count = Self::is_reimage_worthy(val, key) as usize;
     894            0 :                     let new_limit = limit.map(|l| l - base_count);
     895            0 :                     let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     896            0 :                     max_stacked_deltas = std::cmp::max(
     897            0 :                         max_stacked_deltas,
     898            0 :                         base_count + max_stacked_deltas_underneath,
     899            0 :                     );
     900            0 :                 }
     901            0 :             }
     902            7 :         }
     903              : 
     904            7 :         max_stacked_deltas
     905            7 :     }
     906              : 
     907              :     /// Return all L0 delta layers
     908          845 :     pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
     909          845 :         &self.l0_delta_layers
     910          845 :     }
     911              : 
     912              :     /// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers.
     913          231 :     pub fn watch_level0_deltas(&self) -> watch::Receiver<usize> {
     914          231 :         self.watch_l0_deltas.subscribe()
     915          231 :     }
     916              : 
     917              :     /// debugging function to print out the contents of the layer map
     918              :     #[allow(unused)]
     919            1 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     920            1 :         println!("Begin dump LayerMap");
     921              : 
     922            1 :         println!("open_layer:");
     923            1 :         if let Some(open_layer) = &self.open_layer {
     924            0 :             open_layer.dump(verbose, ctx).await?;
     925            1 :         }
     926              : 
     927            1 :         println!("frozen_layers:");
     928            1 :         for frozen_layer in self.frozen_layers.iter() {
     929            0 :             frozen_layer.dump(verbose, ctx).await?;
     930              :         }
     931              : 
     932            1 :         println!("historic_layers:");
     933            6 :         for desc in self.iter_historic_layers() {
     934            6 :             desc.dump();
     935            6 :         }
     936            1 :         println!("End dump LayerMap");
     937            1 :         Ok(())
     938            1 :     }
     939              : 
     940              :     /// `read_points` represent the tip of a timeline and any branch points, i.e. the places
     941              :     /// where we expect to serve reads.
     942              :     ///
     943              :     /// This function is O(N) and should be called infrequently.  The caller is responsible for
     944              :     /// looking up and updating the Layer objects for these layer descriptors.
     945          124 :     pub fn get_visibility(
     946          124 :         &self,
     947          124 :         mut read_points: Vec<Lsn>,
     948          124 :     ) -> (
     949          124 :         Vec<(Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
     950          124 :         KeySpace,
     951          124 :     ) {
     952              :         // This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
     953              :         // KeySpace is intended to be composed statically and iterated over.
     954              :         struct KeyShadow {
     955              :             // Map of range start to range end
     956              :             inner: RangeSetBlaze<i128>,
     957              :         }
     958              : 
     959              :         impl KeyShadow {
     960          124 :             fn new() -> Self {
     961          124 :                 Self {
     962          124 :                     inner: Default::default(),
     963          124 :                 }
     964          124 :             }
     965              : 
     966          810 :             fn contains(&self, range: Range<Key>) -> bool {
     967          810 :                 let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
     968          810 :                 self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
     969          810 :                     CheckSortedDisjoint::from([range_incl]),
     970          810 :                 ))
     971          810 :             }
     972              : 
     973              :             /// Add the input range to the keys covered by self.
     974              :             ///
     975              :             /// Return true if inserting this range covered some keys that were previously not covered
     976          956 :             fn cover(&mut self, insert: Range<Key>) -> bool {
     977          956 :                 let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
     978          956 :                 self.inner.ranges_insert(range_incl)
     979          956 :             }
     980              : 
     981          127 :             fn reset(&mut self) {
     982          127 :                 self.inner = Default::default();
     983          127 :             }
     984              : 
     985          124 :             fn to_keyspace(&self) -> KeySpace {
     986          124 :                 let mut accum = KeySpaceAccum::new();
     987          126 :                 for range_incl in self.inner.ranges() {
     988          126 :                     let range = Range {
     989          126 :                         start: Key::from_i128(*range_incl.start()),
     990          126 :                         end: Key::from_i128(range_incl.end() + 1),
     991          126 :                     };
     992          126 :                     accum.add_range(range)
     993              :                 }
     994              : 
     995          124 :                 accum.to_keyspace()
     996          124 :             }
     997              :         }
     998              : 
     999              :         // The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
    1000              :         // and a ReadPoint
    1001          124 :         read_points.sort_by_key(|rp| rp.0);
    1002          124 :         let mut shadow = KeyShadow::new();
    1003              : 
    1004              :         // We will interleave all our read points and layers into a sorted collection
    1005              :         enum Item {
    1006              :             ReadPoint { lsn: Lsn },
    1007              :             Layer(Arc<PersistentLayerDesc>),
    1008              :         }
    1009              : 
    1010          124 :         let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
    1011          124 :         items.extend(self.iter_historic_layers().map(Item::Layer));
    1012          124 :         items.extend(
    1013          124 :             read_points
    1014          124 :                 .into_iter()
    1015          127 :                 .map(|rp| Item::ReadPoint { lsn: rp }),
    1016              :         );
    1017              : 
    1018              :         // Ordering: we want to iterate like this:
    1019              :         // 1. Highest LSNs first
    1020              :         // 2. Consider images before deltas if they end at the same LSNs (images cover deltas)
    1021              :         // 3. Consider ReadPoints before image layers if they're at the same LSN (readpoints make that image visible)
    1022        31934 :         items.sort_by_key(|item| {
    1023        31934 :             std::cmp::Reverse(match item {
    1024        31700 :                 Item::Layer(layer) => {
    1025        31700 :                     if layer.is_delta() {
    1026        14950 :                         (Lsn(layer.get_lsn_range().end.0 - 1), 0)
    1027              :                     } else {
    1028        16750 :                         (layer.image_layer_lsn(), 1)
    1029              :                     }
    1030              :                 }
    1031          234 :                 Item::ReadPoint { lsn } => (*lsn, 2),
    1032              :             })
    1033        31934 :         });
    1034              : 
    1035          124 :         let mut results = Vec::with_capacity(self.historic.len());
    1036              : 
    1037          124 :         let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
    1038              : 
    1039         2017 :         for item in items {
    1040         1893 :             let (reached_lsn, is_readpoint) = match &item {
    1041          127 :                 Item::ReadPoint { lsn } => (lsn, true),
    1042         1766 :                 Item::Layer(layer) => (&layer.lsn_range.start, false),
    1043              :             };
    1044         1893 :             maybe_covered_deltas.retain(|d| {
    1045           50 :                 if *reached_lsn >= d.lsn_range.start && is_readpoint {
    1046              :                     // We encountered a readpoint within the delta layer: it is visible
    1047              : 
    1048            1 :                     results.push((d.clone(), LayerVisibilityHint::Visible));
    1049            1 :                     false
    1050           49 :                 } else if *reached_lsn < d.lsn_range.start {
    1051              :                     // We passed the layer's range without encountering a read point: it is not visible
    1052           19 :                     results.push((d.clone(), LayerVisibilityHint::Covered));
    1053           19 :                     false
    1054              :                 } else {
    1055              :                     // We're still in the delta layer: continue iterating
    1056           30 :                     true
    1057              :                 }
    1058           50 :             });
    1059              : 
    1060         1893 :             match item {
    1061          127 :                 Item::ReadPoint { lsn: _lsn } => {
    1062          127 :                     // TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
    1063          127 :                     // to assume that the whole key range is visible at the branch point.
    1064          127 :                     shadow.reset();
    1065          127 :                 }
    1066         1766 :                 Item::Layer(layer) => {
    1067         1766 :                     let visibility = if layer.is_delta() {
    1068          810 :                         if shadow.contains(layer.get_key_range()) {
    1069              :                             // If a layer isn't visible based on current state, we must defer deciding whether
    1070              :                             // it is truly not visible until we have advanced past the delta's range: we might
    1071              :                             // encounter another branch point within this delta layer's LSN range.
    1072           22 :                             maybe_covered_deltas.push(layer);
    1073           22 :                             continue;
    1074              :                         } else {
    1075          788 :                             LayerVisibilityHint::Visible
    1076              :                         }
    1077              :                     } else {
    1078          956 :                         let modified = shadow.cover(layer.get_key_range());
    1079          956 :                         if modified {
    1080              :                             // An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
    1081          950 :                             LayerVisibilityHint::Visible
    1082              :                         } else {
    1083              :                             // An image layer in a region that was already covered
    1084            6 :                             LayerVisibilityHint::Covered
    1085              :                         }
    1086              :                     };
    1087              : 
    1088         1744 :                     results.push((layer, visibility));
    1089              :                 }
    1090              :             }
    1091              :         }
    1092              : 
    1093              :         // Drain any remaining maybe_covered deltas
    1094          124 :         results.extend(
    1095          124 :             maybe_covered_deltas
    1096          124 :                 .into_iter()
    1097          124 :                 .map(|d| (d, LayerVisibilityHint::Covered)),
    1098              :         );
    1099              : 
    1100          124 :         (results, shadow.to_keyspace())
    1101          124 :     }
    1102              : }
    1103              : 
    1104              : #[cfg(test)]
    1105              : mod tests {
    1106              :     use std::collections::HashMap;
    1107              :     use std::path::PathBuf;
    1108              : 
    1109              :     use crate::{
    1110              :         DEFAULT_PG_VERSION,
    1111              :         tenant::{harness::TenantHarness, storage_layer::LayerName},
    1112              :     };
    1113              :     use pageserver_api::key::DBDIR_KEY;
    1114              :     use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
    1115              :     use tokio_util::sync::CancellationToken;
    1116              :     use utils::id::{TenantId, TimelineId};
    1117              :     use utils::shard::TenantShardId;
    1118              : 
    1119              :     use super::*;
    1120              :     use crate::tenant::IndexPart;
    1121              : 
    1122              :     #[derive(Clone)]
    1123              :     struct LayerDesc {
    1124              :         key_range: Range<Key>,
    1125              :         lsn_range: Range<Lsn>,
    1126              :         is_delta: bool,
    1127              :     }
    1128              : 
    1129            1 :     fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
    1130            1 :         let mut layer_map = LayerMap::default();
    1131              : 
    1132            6 :         for layer in layers {
    1133            5 :             layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
    1134            5 :                 layer.key_range,
    1135            5 :                 layer.lsn_range,
    1136            5 :                 layer.is_delta,
    1137            5 :             ));
    1138            5 :         }
    1139              : 
    1140            1 :         layer_map.flush_updates();
    1141            1 :         layer_map
    1142            1 :     }
    1143              : 
    1144         3541 :     fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
    1145         3541 :         let lhs: HashMap<SearchResult, KeySpace> = lhs
    1146         3541 :             .found
    1147         3541 :             .into_iter()
    1148         6820 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1149         3541 :             .collect();
    1150         3541 :         let rhs: HashMap<SearchResult, KeySpace> = rhs
    1151         3541 :             .found
    1152         3541 :             .into_iter()
    1153         6820 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1154         3541 :             .collect();
    1155              : 
    1156         3541 :         assert_eq!(lhs, rhs);
    1157         3541 :     }
    1158              : 
    1159              :     #[cfg(test)]
    1160         3540 :     fn brute_force_range_search(
    1161         3540 :         layer_map: &LayerMap,
    1162         3540 :         key_range: Range<Key>,
    1163         3540 :         end_lsn: Lsn,
    1164         3540 :     ) -> RangeSearchResult {
    1165         3540 :         let mut range_search_result = RangeSearchResult::new();
    1166              : 
    1167         3540 :         let mut key = key_range.start;
    1168        75520 :         while key != key_range.end {
    1169        71980 :             let res = layer_map.search(key, end_lsn);
    1170        71980 :             if let Some(res) = res {
    1171        66650 :                 range_search_result
    1172        66650 :                     .found
    1173        66650 :                     .entry(res)
    1174        66650 :                     .or_default()
    1175        66650 :                     .add_key(key);
    1176        66650 :             }
    1177              : 
    1178        71980 :             key = key.next();
    1179              :         }
    1180              : 
    1181         3540 :         range_search_result
    1182         3540 :     }
    1183              : 
    1184              :     #[test]
    1185            1 :     fn ranged_search_on_empty_layer_map() {
    1186            1 :         let layer_map = LayerMap::default();
    1187            1 :         let range = Key::from_i128(100)..Key::from_i128(200);
    1188              : 
    1189            1 :         let res = layer_map.range_search(range.clone(), Lsn(100));
    1190            1 :         assert_range_search_result_eq(res, RangeSearchResult::new());
    1191            1 :     }
    1192              : 
    1193              :     #[tokio::test]
    1194            1 :     async fn ranged_search() {
    1195            1 :         let harness = TenantHarness::create("ranged_search").await.unwrap();
    1196            1 :         let (tenant, ctx) = harness.load().await;
    1197            1 :         let cancel = CancellationToken::new();
    1198            1 :         let timeline_id = TimelineId::generate();
    1199              :         // Create the timeline such that the in-memory layers can be written
    1200              :         // to the timeline directory.
    1201            1 :         tenant
    1202            1 :             .create_test_timeline(timeline_id, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1203            1 :             .await
    1204            1 :             .unwrap();
    1205              : 
    1206            1 :         let gate = utils::sync::gate::Gate::default();
    1207            2 :         let add_in_memory_layer = async |layer_map: &mut LayerMap, lsn_range: Range<Lsn>| {
    1208            1 :             let layer = InMemoryLayer::create(
    1209            1 :                 harness.conf,
    1210            1 :                 timeline_id,
    1211            1 :                 harness.tenant_shard_id,
    1212            1 :                 lsn_range.start,
    1213            1 :                 &gate,
    1214            1 :                 &cancel,
    1215            1 :                 &ctx,
    1216            1 :             )
    1217            1 :             .await
    1218            1 :             .unwrap();
    1219              : 
    1220            1 :             layer.freeze(lsn_range.end).await;
    1221              : 
    1222            1 :             layer_map.frozen_layers.push_back(Arc::new(layer));
    1223            2 :         };
    1224              : 
    1225            1 :         let in_memory_layer_configurations = [
    1226            1 :             vec![],
    1227            1 :             // Overlaps with the top-most image
    1228            1 :             vec![Lsn(35)..Lsn(50)],
    1229            1 :         ];
    1230              : 
    1231            1 :         let layers = vec![
    1232            1 :             LayerDesc {
    1233            1 :                 key_range: Key::from_i128(15)..Key::from_i128(50),
    1234            1 :                 lsn_range: Lsn(5)..Lsn(6),
    1235            1 :                 is_delta: false,
    1236            1 :             },
    1237            1 :             LayerDesc {
    1238            1 :                 key_range: Key::from_i128(10)..Key::from_i128(20),
    1239            1 :                 lsn_range: Lsn(5)..Lsn(20),
    1240            1 :                 is_delta: true,
    1241            1 :             },
    1242            1 :             LayerDesc {
    1243            1 :                 key_range: Key::from_i128(15)..Key::from_i128(25),
    1244            1 :                 lsn_range: Lsn(20)..Lsn(30),
    1245            1 :                 is_delta: true,
    1246            1 :             },
    1247            1 :             LayerDesc {
    1248            1 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1249            1 :                 lsn_range: Lsn(25)..Lsn(35),
    1250            1 :                 is_delta: true,
    1251            1 :             },
    1252            1 :             LayerDesc {
    1253            1 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1254            1 :                 lsn_range: Lsn(40)..Lsn(41),
    1255            1 :                 is_delta: false,
    1256            1 :             },
    1257              :         ];
    1258              : 
    1259            1 :         let mut layer_map = create_layer_map(layers.clone());
    1260            3 :         for in_memory_layers in in_memory_layer_configurations {
    1261            3 :             for in_mem_layer_range in in_memory_layers {
    1262            1 :                 add_in_memory_layer(&mut layer_map, in_mem_layer_range).await;
    1263            1 :             }
    1264            1 : 
    1265          122 :             for start in 0..60 {
    1266         3540 :                 for end in (start + 1)..60 {
    1267         3540 :                     let range = Key::from_i128(start)..Key::from_i128(end);
    1268         3540 :                     let result = layer_map.range_search(range.clone(), Lsn(100));
    1269         3540 :                     let expected = brute_force_range_search(&layer_map, range, Lsn(100));
    1270         3540 : 
    1271         3540 :                     eprintln!("{start}..{end}: {result:?}");
    1272         3540 : 
    1273         3540 :                     assert_range_search_result_eq(result, expected);
    1274         3540 :                 }
    1275            1 :             }
    1276            1 :         }
    1277            1 :     }
    1278              : 
    1279              :     #[test]
    1280            1 :     fn layer_visibility_basic() {
    1281              :         // A simple synthetic input, as a smoke test.
    1282            1 :         let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
    1283            1 :         let timeline_id = TimelineId::generate();
    1284            1 :         let mut layer_map = LayerMap::default();
    1285            1 :         let mut updates = layer_map.batch_update();
    1286              : 
    1287              :         const FAKE_LAYER_SIZE: u64 = 1024;
    1288              : 
    1289            1 :         let inject_delta = |updates: &mut BatchedUpdates,
    1290              :                             key_start: i128,
    1291              :                             key_end: i128,
    1292              :                             lsn_start: u64,
    1293            7 :                             lsn_end: u64| {
    1294            7 :             let desc = PersistentLayerDesc::new_delta(
    1295            7 :                 tenant_shard_id,
    1296            7 :                 timeline_id,
    1297            7 :                 Range {
    1298            7 :                     start: Key::from_i128(key_start),
    1299            7 :                     end: Key::from_i128(key_end),
    1300            7 :                 },
    1301            7 :                 Range {
    1302            7 :                     start: Lsn(lsn_start),
    1303            7 :                     end: Lsn(lsn_end),
    1304            7 :                 },
    1305              :                 1024,
    1306              :             );
    1307            7 :             updates.insert_historic(desc.clone());
    1308            7 :             desc
    1309            7 :         };
    1310              : 
    1311            1 :         let inject_image =
    1312            6 :             |updates: &mut BatchedUpdates, key_start: i128, key_end: i128, lsn: u64| {
    1313            6 :                 let desc = PersistentLayerDesc::new_img(
    1314            6 :                     tenant_shard_id,
    1315            6 :                     timeline_id,
    1316            6 :                     Range {
    1317            6 :                         start: Key::from_i128(key_start),
    1318            6 :                         end: Key::from_i128(key_end),
    1319            6 :                     },
    1320            6 :                     Lsn(lsn),
    1321              :                     FAKE_LAYER_SIZE,
    1322              :                 );
    1323            6 :                 updates.insert_historic(desc.clone());
    1324            6 :                 desc
    1325            6 :             };
    1326              : 
    1327              :         //
    1328              :         // Construct our scenario: the following lines go in backward-LSN order, constructing the various scenarios
    1329              :         // we expect to handle.  You can follow these examples through in the same order as they would be processed
    1330              :         // by the function under test.
    1331              :         //
    1332              : 
    1333            1 :         let mut read_points = vec![Lsn(1000)];
    1334              : 
    1335              :         // A delta ahead of any image layer
    1336            1 :         let ahead_layer = inject_delta(&mut updates, 10, 20, 101, 110);
    1337              : 
    1338              :         // An image layer is visible and covers some layers beneath itself
    1339            1 :         let visible_covering_img = inject_image(&mut updates, 5, 25, 99);
    1340              : 
    1341              :         // A delta layer covered by the image layer: should be covered
    1342            1 :         let covered_delta = inject_delta(&mut updates, 10, 20, 90, 100);
    1343              : 
    1344              :         // A delta layer partially covered by an image layer: should be visible
    1345            1 :         let partially_covered_delta = inject_delta(&mut updates, 1, 7, 90, 100);
    1346              : 
    1347              :         // A delta layer not covered by an image layer: should be visible
    1348            1 :         let not_covered_delta = inject_delta(&mut updates, 1, 4, 90, 100);
    1349              : 
    1350              :         // An image layer covered by the image layer above: should be covered
    1351            1 :         let covered_image = inject_image(&mut updates, 10, 20, 89);
    1352              : 
    1353              :         // An image layer partially covered by an image layer: should be visible
    1354            1 :         let partially_covered_image = inject_image(&mut updates, 1, 7, 89);
    1355              : 
    1356              :         // An image layer not covered by an image layer: should be visible
    1357            1 :         let not_covered_image = inject_image(&mut updates, 1, 4, 89);
    1358              : 
    1359              :         // A read point: this will make subsequent layers below here visible, even if there are
    1360              :         // more recent layers covering them.
    1361            1 :         read_points.push(Lsn(80));
    1362              : 
    1363              :         // A delta layer covered by an earlier image layer, but visible to a readpoint below that covering layer
    1364            1 :         let covered_delta_below_read_point = inject_delta(&mut updates, 10, 20, 70, 79);
    1365              : 
    1366              :         // A delta layer whose end LSN is covered, but where a read point is present partway through its LSN range:
    1367              :         // the read point should make it visible, even though its end LSN is covered
    1368            1 :         let covering_img_between_read_points = inject_image(&mut updates, 10, 20, 69);
    1369            1 :         let covered_delta_between_read_points = inject_delta(&mut updates, 10, 15, 67, 69);
    1370            1 :         read_points.push(Lsn(65));
    1371            1 :         let covered_delta_intersects_read_point = inject_delta(&mut updates, 15, 20, 60, 69);
    1372              : 
    1373            1 :         let visible_img_after_last_read_point = inject_image(&mut updates, 10, 20, 65);
    1374              : 
    1375            1 :         updates.flush();
    1376              : 
    1377            1 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1378            1 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1379              : 
    1380            1 :         assert_eq!(
    1381            1 :             layer_visibilities.get(&ahead_layer),
    1382              :             Some(&LayerVisibilityHint::Visible)
    1383              :         );
    1384            1 :         assert_eq!(
    1385            1 :             layer_visibilities.get(&visible_covering_img),
    1386              :             Some(&LayerVisibilityHint::Visible)
    1387              :         );
    1388            1 :         assert_eq!(
    1389            1 :             layer_visibilities.get(&covered_delta),
    1390              :             Some(&LayerVisibilityHint::Covered)
    1391              :         );
    1392            1 :         assert_eq!(
    1393            1 :             layer_visibilities.get(&partially_covered_delta),
    1394              :             Some(&LayerVisibilityHint::Visible)
    1395              :         );
    1396            1 :         assert_eq!(
    1397            1 :             layer_visibilities.get(&not_covered_delta),
    1398              :             Some(&LayerVisibilityHint::Visible)
    1399              :         );
    1400            1 :         assert_eq!(
    1401            1 :             layer_visibilities.get(&covered_image),
    1402              :             Some(&LayerVisibilityHint::Covered)
    1403              :         );
    1404            1 :         assert_eq!(
    1405            1 :             layer_visibilities.get(&partially_covered_image),
    1406              :             Some(&LayerVisibilityHint::Visible)
    1407              :         );
    1408            1 :         assert_eq!(
    1409            1 :             layer_visibilities.get(&not_covered_image),
    1410              :             Some(&LayerVisibilityHint::Visible)
    1411              :         );
    1412            1 :         assert_eq!(
    1413            1 :             layer_visibilities.get(&covered_delta_below_read_point),
    1414              :             Some(&LayerVisibilityHint::Visible)
    1415              :         );
    1416            1 :         assert_eq!(
    1417            1 :             layer_visibilities.get(&covering_img_between_read_points),
    1418              :             Some(&LayerVisibilityHint::Visible)
    1419              :         );
    1420            1 :         assert_eq!(
    1421            1 :             layer_visibilities.get(&covered_delta_between_read_points),
    1422              :             Some(&LayerVisibilityHint::Covered)
    1423              :         );
    1424            1 :         assert_eq!(
    1425            1 :             layer_visibilities.get(&covered_delta_intersects_read_point),
    1426              :             Some(&LayerVisibilityHint::Visible)
    1427              :         );
    1428            1 :         assert_eq!(
    1429            1 :             layer_visibilities.get(&visible_img_after_last_read_point),
    1430              :             Some(&LayerVisibilityHint::Visible)
    1431              :         );
    1432              : 
    1433              :         // Shadow should include all the images below the last read point
    1434            1 :         let expected_shadow = KeySpace {
    1435            1 :             ranges: vec![Key::from_i128(10)..Key::from_i128(20)],
    1436            1 :         };
    1437            1 :         assert_eq!(shadow, expected_shadow);
    1438            1 :     }
    1439              : 
    1440            1 :     fn fixture_path(relative: &str) -> PathBuf {
    1441            1 :         PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
    1442            1 :     }
    1443              : 
    1444              :     #[test]
    1445            1 :     fn layer_visibility_realistic() {
    1446              :         // Load a large example layermap
    1447            1 :         let index_raw = std::fs::read_to_string(fixture_path(
    1448            1 :             "test_data/indices/mixed_workload/index_part.json",
    1449              :         ))
    1450            1 :         .unwrap();
    1451            1 :         let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
    1452              : 
    1453            1 :         let tenant_id = TenantId::generate();
    1454            1 :         let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1455            1 :         let timeline_id = TimelineId::generate();
    1456              : 
    1457            1 :         let mut layer_map = LayerMap::default();
    1458            1 :         let mut updates = layer_map.batch_update();
    1459         1565 :         for (layer_name, layer_metadata) in index.layer_metadata {
    1460         1564 :             let layer_desc = match layer_name {
    1461          802 :                 LayerName::Image(layer_name) => PersistentLayerDesc {
    1462          802 :                     key_range: layer_name.key_range.clone(),
    1463          802 :                     lsn_range: layer_name.lsn_as_range(),
    1464          802 :                     tenant_shard_id,
    1465          802 :                     timeline_id,
    1466          802 :                     is_delta: false,
    1467          802 :                     file_size: layer_metadata.file_size,
    1468          802 :                 },
    1469          762 :                 LayerName::Delta(layer_name) => PersistentLayerDesc {
    1470          762 :                     key_range: layer_name.key_range,
    1471          762 :                     lsn_range: layer_name.lsn_range,
    1472          762 :                     tenant_shard_id,
    1473          762 :                     timeline_id,
    1474          762 :                     is_delta: true,
    1475          762 :                     file_size: layer_metadata.file_size,
    1476          762 :                 },
    1477              :             };
    1478         1564 :             updates.insert_historic(layer_desc);
    1479              :         }
    1480            1 :         updates.flush();
    1481              : 
    1482            1 :         let read_points = vec![index.metadata.disk_consistent_lsn()];
    1483            1 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1484         1565 :         for (layer_desc, visibility) in &layer_visibilities {
    1485         1564 :             tracing::info!("{layer_desc:?}: {visibility:?}");
    1486         1564 :             eprintln!("{layer_desc:?}: {visibility:?}");
    1487              :         }
    1488              : 
    1489              :         // The shadow should be non-empty, since there were some image layers
    1490            1 :         assert!(!shadow.ranges.is_empty());
    1491              : 
    1492              :         // At least some layers should be marked covered
    1493            1 :         assert!(
    1494            1 :             layer_visibilities
    1495            1 :                 .iter()
    1496           19 :                 .any(|i| matches!(i.1, LayerVisibilityHint::Covered))
    1497              :         );
    1498              : 
    1499            1 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1500              : 
    1501              :         // Brute force validation: a layer should be marked covered if and only if there are image layers above it in LSN order which cover it
    1502         1565 :         for (layer_desc, visible) in &layer_visibilities {
    1503         1564 :             let mut coverage = KeySpaceRandomAccum::new();
    1504         1564 :             let mut covered_by = Vec::new();
    1505              : 
    1506      2446096 :             for other_layer in layer_map.iter_historic_layers() {
    1507      2446096 :                 if &other_layer == layer_desc {
    1508         1564 :                     continue;
    1509      2444532 :                 }
    1510      2444532 :                 if !other_layer.is_delta()
    1511      1253526 :                     && other_layer.image_layer_lsn() >= Lsn(layer_desc.get_lsn_range().end.0 - 1)
    1512       631756 :                     && other_layer.key_range.start <= layer_desc.key_range.end
    1513       232652 :                     && layer_desc.key_range.start <= other_layer.key_range.end
    1514        42823 :                 {
    1515        42823 :                     coverage.add_range(other_layer.get_key_range());
    1516        42823 :                     covered_by.push((*other_layer).clone());
    1517      2401709 :                 }
    1518              :             }
    1519         1564 :             let coverage = coverage.to_keyspace();
    1520              : 
    1521         1564 :             let expect_visible = if coverage.ranges.len() == 1
    1522          378 :                 && coverage.contains(&layer_desc.key_range.start)
    1523           17 :                 && coverage.contains(&Key::from_i128(layer_desc.key_range.end.to_i128() - 1))
    1524              :             {
    1525           10 :                 LayerVisibilityHint::Covered
    1526              :             } else {
    1527         1554 :                 LayerVisibilityHint::Visible
    1528              :             };
    1529              : 
    1530         1564 :             if expect_visible != *visible {
    1531            0 :                 eprintln!(
    1532            0 :                     "Layer {}..{} @ {}..{} (delta={}) is {visible:?}, should be {expect_visible:?}",
    1533            0 :                     layer_desc.key_range.start,
    1534            0 :                     layer_desc.key_range.end,
    1535            0 :                     layer_desc.lsn_range.start,
    1536            0 :                     layer_desc.lsn_range.end,
    1537            0 :                     layer_desc.is_delta()
    1538              :                 );
    1539            0 :                 if expect_visible == LayerVisibilityHint::Covered {
    1540            0 :                     eprintln!("Covered by:");
    1541            0 :                     for other in covered_by {
    1542            0 :                         eprintln!(
    1543            0 :                             "  {}..{} @ {}",
    1544            0 :                             other.get_key_range().start,
    1545            0 :                             other.get_key_range().end,
    1546            0 :                             other.image_layer_lsn()
    1547            0 :                         );
    1548            0 :                     }
    1549            0 :                     if let Some(range) = coverage.ranges.first() {
    1550            0 :                         eprintln!(
    1551            0 :                             "Total coverage from contributing layers: {}..{}",
    1552            0 :                             range.start, range.end
    1553            0 :                         );
    1554            0 :                     } else {
    1555            0 :                         eprintln!(
    1556            0 :                             "Total coverage from contributing layers: {:?}",
    1557            0 :                             coverage.ranges
    1558            0 :                         );
    1559            0 :                     }
    1560            0 :                 }
    1561         1564 :             }
    1562         1564 :             assert_eq!(expect_visible, *visible);
    1563              :         }
    1564              : 
    1565              :         // Sanity: the layer that holds latest data for the DBDIR key should always be visible
    1566              :         // (just using this key as a key that will always exist for any layermap fixture)
    1567            1 :         let dbdir_layer = {
    1568            1 :             let readable_layer = layer_map
    1569            1 :                 .search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
    1570            1 :                 .unwrap();
    1571              : 
    1572            1 :             match readable_layer.layer {
    1573            1 :                 ReadableLayerWeak::PersistentLayer(desc) => desc,
    1574            0 :                 ReadableLayerWeak::InMemoryLayer(_) => unreachable!(""),
    1575              :             }
    1576              :         };
    1577            1 :         assert!(matches!(
    1578            1 :             layer_visibilities.get(&dbdir_layer).unwrap(),
    1579              :             LayerVisibilityHint::Visible
    1580              :         ));
    1581            1 :     }
    1582              : }
    1583              : 
    1584              : #[cfg(test)]
    1585              : mod select_layer_tests {
    1586              :     use super::*;
    1587              : 
    1588           17 :     fn create_persistent_layer(
    1589           17 :         start_lsn: u64,
    1590           17 :         end_lsn: u64,
    1591           17 :         is_delta: bool,
    1592           17 :     ) -> Arc<PersistentLayerDesc> {
    1593           17 :         if !is_delta {
    1594            8 :             assert_eq!(end_lsn, start_lsn + 1);
    1595            9 :         }
    1596              : 
    1597           17 :         Arc::new(PersistentLayerDesc::new_test(
    1598           17 :             Key::MIN..Key::MAX,
    1599           17 :             Lsn(start_lsn)..Lsn(end_lsn),
    1600           17 :             is_delta,
    1601              :         ))
    1602           17 :     }
    1603              : 
    1604            6 :     fn create_inmem_layer(start_lsn: u64, end_lsn: u64) -> InMemoryLayerDesc {
    1605            6 :         InMemoryLayerDesc {
    1606            6 :             handle: InMemoryLayerHandle::Open,
    1607            6 :             lsn_range: Lsn(start_lsn)..Lsn(end_lsn),
    1608            6 :         }
    1609            6 :     }
    1610              : 
    1611              :     #[test]
    1612            1 :     fn test_select_layer_empty() {
    1613            1 :         assert!(LayerMap::select_layer(None, None, None, Lsn(100)).is_none());
    1614            1 :     }
    1615              : 
    1616              :     #[test]
    1617            1 :     fn test_select_layer_only_delta() {
    1618            1 :         let delta = create_persistent_layer(10, 20, true);
    1619            1 :         let result = LayerMap::select_layer(Some(delta.clone()), None, None, Lsn(100)).unwrap();
    1620              : 
    1621            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1622            1 :         assert!(
    1623            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1624              :         );
    1625            1 :     }
    1626              : 
    1627              :     #[test]
    1628            1 :     fn test_select_layer_only_image() {
    1629            1 :         let image = create_persistent_layer(10, 11, false);
    1630            1 :         let result = LayerMap::select_layer(None, Some(image.clone()), None, Lsn(100)).unwrap();
    1631              : 
    1632            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1633            1 :         assert!(
    1634            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1635              :         );
    1636            1 :     }
    1637              : 
    1638              :     #[test]
    1639            1 :     fn test_select_layer_only_inmem() {
    1640            1 :         let inmem = create_inmem_layer(10, 20);
    1641            1 :         let result = LayerMap::select_layer(None, None, Some(inmem.clone()), Lsn(100)).unwrap();
    1642              : 
    1643            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1644            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1645            1 :     }
    1646              : 
    1647              :     #[test]
    1648            1 :     fn test_select_layer_image_inside_delta() {
    1649            1 :         let delta = create_persistent_layer(10, 20, true);
    1650            1 :         let image = create_persistent_layer(15, 16, false);
    1651              : 
    1652            1 :         let result =
    1653            1 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(100))
    1654            1 :                 .unwrap();
    1655              : 
    1656            1 :         assert_eq!(result.lsn_floor, Lsn(16));
    1657            1 :         assert!(
    1658            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1659              :         );
    1660              : 
    1661            1 :         let result = LayerMap::select_layer(
    1662            1 :             Some(delta.clone()),
    1663            1 :             Some(image.clone()),
    1664            1 :             None,
    1665            1 :             result.lsn_floor,
    1666              :         )
    1667            1 :         .unwrap();
    1668              : 
    1669            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    1670            1 :         assert!(
    1671            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1672              :         );
    1673            1 :     }
    1674              : 
    1675              :     #[test]
    1676            1 :     fn test_select_layer_newer_image() {
    1677            1 :         let delta = create_persistent_layer(10, 20, true);
    1678            1 :         let image = create_persistent_layer(25, 26, false);
    1679              : 
    1680            1 :         let result =
    1681            1 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1682            1 :                 .unwrap();
    1683              : 
    1684            1 :         assert_eq!(result.lsn_floor, Lsn(25));
    1685            1 :         assert!(
    1686            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1687              :         );
    1688              : 
    1689            1 :         let result =
    1690            1 :             LayerMap::select_layer(Some(delta.clone()), None, None, result.lsn_floor).unwrap();
    1691              : 
    1692            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1693            1 :         assert!(
    1694            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1695              :         );
    1696            1 :     }
    1697              : 
    1698              :     #[test]
    1699            1 :     fn test_select_layer_delta_with_older_image() {
    1700            1 :         let delta = create_persistent_layer(15, 25, true);
    1701            1 :         let image = create_persistent_layer(10, 11, false);
    1702              : 
    1703            1 :         let result =
    1704            1 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1705            1 :                 .unwrap();
    1706              : 
    1707            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    1708            1 :         assert!(
    1709            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1710              :         );
    1711              : 
    1712            1 :         let result =
    1713            1 :             LayerMap::select_layer(None, Some(image.clone()), None, result.lsn_floor).unwrap();
    1714              : 
    1715            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1716            1 :         assert!(
    1717            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1718              :         );
    1719            1 :     }
    1720              : 
    1721              :     #[test]
    1722            1 :     fn test_select_layer_image_inside_inmem() {
    1723            1 :         let image = create_persistent_layer(15, 16, false);
    1724            1 :         let inmem = create_inmem_layer(10, 25);
    1725              : 
    1726            1 :         let result =
    1727            1 :             LayerMap::select_layer(None, Some(image.clone()), Some(inmem.clone()), Lsn(30))
    1728            1 :                 .unwrap();
    1729              : 
    1730            1 :         assert_eq!(result.lsn_floor, Lsn(16));
    1731            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1732              : 
    1733            1 :         let result = LayerMap::select_layer(
    1734            1 :             None,
    1735            1 :             Some(image.clone()),
    1736            1 :             Some(inmem.clone()),
    1737            1 :             result.lsn_floor,
    1738              :         )
    1739            1 :         .unwrap();
    1740              : 
    1741            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    1742            1 :         assert!(
    1743            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1744              :         );
    1745              : 
    1746            1 :         let result =
    1747            1 :             LayerMap::select_layer(None, None, Some(inmem.clone()), result.lsn_floor).unwrap();
    1748            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1749            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1750            1 :     }
    1751              : 
    1752              :     #[test]
    1753            1 :     fn test_select_layer_delta_inside_inmem() {
    1754            1 :         let delta_top = create_persistent_layer(15, 20, true);
    1755            1 :         let delta_bottom = create_persistent_layer(10, 15, true);
    1756            1 :         let inmem = create_inmem_layer(15, 25);
    1757              : 
    1758            1 :         let result =
    1759            1 :             LayerMap::select_layer(Some(delta_top.clone()), None, Some(inmem.clone()), Lsn(30))
    1760            1 :                 .unwrap();
    1761              : 
    1762            1 :         assert_eq!(result.lsn_floor, Lsn(20));
    1763            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1764              : 
    1765            1 :         let result = LayerMap::select_layer(
    1766            1 :             Some(delta_top.clone()),
    1767            1 :             None,
    1768            1 :             Some(inmem.clone()),
    1769            1 :             result.lsn_floor,
    1770              :         )
    1771            1 :         .unwrap();
    1772              : 
    1773            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    1774            1 :         assert!(
    1775            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_top))
    1776              :         );
    1777              : 
    1778            1 :         let result = LayerMap::select_layer(
    1779            1 :             Some(delta_bottom.clone()),
    1780            1 :             None,
    1781            1 :             Some(inmem.clone()),
    1782            1 :             result.lsn_floor,
    1783              :         )
    1784            1 :         .unwrap();
    1785            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1786            1 :         assert!(
    1787            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_bottom))
    1788              :         );
    1789            1 :     }
    1790              : 
    1791              :     #[test]
    1792            1 :     fn test_select_layer_all_overlap_1() {
    1793            1 :         let inmem = create_inmem_layer(10, 30);
    1794            1 :         let delta = create_persistent_layer(15, 25, true);
    1795            1 :         let image = create_persistent_layer(20, 21, false);
    1796              : 
    1797            1 :         let result = LayerMap::select_layer(
    1798            1 :             Some(delta.clone()),
    1799            1 :             Some(image.clone()),
    1800            1 :             Some(inmem.clone()),
    1801            1 :             Lsn(50),
    1802              :         )
    1803            1 :         .unwrap();
    1804              : 
    1805            1 :         assert_eq!(result.lsn_floor, Lsn(25));
    1806            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1807              : 
    1808            1 :         let result = LayerMap::select_layer(
    1809            1 :             Some(delta.clone()),
    1810            1 :             Some(image.clone()),
    1811            1 :             Some(inmem.clone()),
    1812            1 :             result.lsn_floor,
    1813              :         )
    1814            1 :         .unwrap();
    1815              : 
    1816            1 :         assert_eq!(result.lsn_floor, Lsn(21));
    1817            1 :         assert!(
    1818            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1819              :         );
    1820              : 
    1821            1 :         let result = LayerMap::select_layer(
    1822            1 :             Some(delta.clone()),
    1823            1 :             Some(image.clone()),
    1824            1 :             Some(inmem.clone()),
    1825            1 :             result.lsn_floor,
    1826              :         )
    1827            1 :         .unwrap();
    1828              : 
    1829            1 :         assert_eq!(result.lsn_floor, Lsn(20));
    1830            1 :         assert!(
    1831            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1832              :         );
    1833            1 :     }
    1834              : 
    1835              :     #[test]
    1836            1 :     fn test_select_layer_all_overlap_2() {
    1837            1 :         let inmem = create_inmem_layer(20, 30);
    1838            1 :         let delta = create_persistent_layer(10, 40, true);
    1839            1 :         let image = create_persistent_layer(25, 26, false);
    1840              : 
    1841            1 :         let result = LayerMap::select_layer(
    1842            1 :             Some(delta.clone()),
    1843            1 :             Some(image.clone()),
    1844            1 :             Some(inmem.clone()),
    1845            1 :             Lsn(50),
    1846              :         )
    1847            1 :         .unwrap();
    1848              : 
    1849            1 :         assert_eq!(result.lsn_floor, Lsn(26));
    1850            1 :         assert!(
    1851            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1852              :         );
    1853              : 
    1854            1 :         let result = LayerMap::select_layer(
    1855            1 :             Some(delta.clone()),
    1856            1 :             Some(image.clone()),
    1857            1 :             Some(inmem.clone()),
    1858            1 :             result.lsn_floor,
    1859              :         )
    1860            1 :         .unwrap();
    1861              : 
    1862            1 :         assert_eq!(result.lsn_floor, Lsn(25));
    1863            1 :         assert!(
    1864            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1865              :         );
    1866            1 :     }
    1867              : 
    1868              :     #[test]
    1869            1 :     fn test_select_layer_all_overlap_3() {
    1870            1 :         let inmem = create_inmem_layer(30, 40);
    1871            1 :         let delta = create_persistent_layer(10, 30, true);
    1872            1 :         let image = create_persistent_layer(20, 21, false);
    1873              : 
    1874            1 :         let result = LayerMap::select_layer(
    1875            1 :             Some(delta.clone()),
    1876            1 :             Some(image.clone()),
    1877            1 :             Some(inmem.clone()),
    1878            1 :             Lsn(50),
    1879              :         )
    1880            1 :         .unwrap();
    1881              : 
    1882            1 :         assert_eq!(result.lsn_floor, Lsn(30));
    1883            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1884              : 
    1885            1 :         let result = LayerMap::select_layer(
    1886            1 :             Some(delta.clone()),
    1887            1 :             Some(image.clone()),
    1888            1 :             None,
    1889            1 :             result.lsn_floor,
    1890              :         )
    1891            1 :         .unwrap();
    1892              : 
    1893            1 :         assert_eq!(result.lsn_floor, Lsn(21));
    1894            1 :         assert!(
    1895            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1896              :         );
    1897              : 
    1898            1 :         let result = LayerMap::select_layer(
    1899            1 :             Some(delta.clone()),
    1900            1 :             Some(image.clone()),
    1901            1 :             None,
    1902            1 :             result.lsn_floor,
    1903              :         )
    1904            1 :         .unwrap();
    1905              : 
    1906            1 :         assert_eq!(result.lsn_floor, Lsn(20));
    1907            1 :         assert!(
    1908            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1909              :         );
    1910            1 :     }
    1911              : }
        

Generated by: LCOV version 2.1-beta