LCOV - code coverage report
Current view: top level - pageserver/src/tenant - layer_map.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 94.9 % 1263 1199
Test Date: 2025-07-16 12:29:03 Functions: 96.6 % 88 85

            Line data    Source code
       1              : //!
       2              : //! The layer map tracks what layers exist in a timeline.
       3              : //!
       4              : //! When the timeline is first accessed, the server lists of all layer files
       5              : //! in the timelines/<timeline_id> directory, and populates this map with
       6              : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
       7              : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
       8              : //! records. Now and then, in the checkpoint() function, the in-memory layer is
       9              : //! are frozen, and it is split up into new image and delta layers and the
      10              : //! corresponding files are written to disk.
      11              : //!
      12              : //! Design overview:
      13              : //!
      14              : //! The `search` method of the layer map is on the read critical path, so we've
      15              : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
      16              : //! Other read methods are less critical but still impact performance of background tasks.
      17              : //!
      18              : //! This data structure relies on a persistent/immutable binary search tree. See the
      19              : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
      20              : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
      21              : //! you to modify the tree in such a way that each modification creates a new "version"
      22              : //! of the tree. When you modify it, you get a new version, but all previous versions are
      23              : //! still accessible too. So if someone is still holding a reference to an older version,
      24              : //! they continue to see the tree as it was then. The persistent BST stores all the
      25              : //! different versions in an efficient way.
      26              : //!
      27              : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
      28              : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
      29              : //! to handle the LSN dimension.
      30              : //!
      31              : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
      32              : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
      33              : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
      34              : //! `historic_layer_coverage.rs`.
      35              : //!
      36              : //! To search for a particular key-LSN pair, you first look up the right "version" in the
      37              : //! BTreeMap. Then you search that version of the BST with the key.
      38              : //!
      39              : //! The persistent BST keeps all the versions, but there is no way to change the old versions
      40              : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
      41              : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
      42              : //! to throw away most of the persistent BST and build a new one, starting from the oldest
      43              : //! LSN. See [`LayerMap::flush_updates()`].
      44              : //!
      45              : 
      46              : mod historic_layer_coverage;
      47              : mod layer_coverage;
      48              : 
      49              : use std::collections::{BTreeMap, HashMap, VecDeque};
      50              : use std::iter::Peekable;
      51              : use std::ops::Range;
      52              : use std::sync::Arc;
      53              : use std::time::Instant;
      54              : 
      55              : use anyhow::Result;
      56              : use historic_layer_coverage::BufferedHistoricLayerCoverage;
      57              : pub use historic_layer_coverage::LayerKey;
      58              : use pageserver_api::key::Key;
      59              : use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
      60              : use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
      61              : use tokio::sync::watch;
      62              : use utils::lsn::Lsn;
      63              : 
      64              : use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
      65              : use crate::context::RequestContext;
      66              : use crate::tenant::storage_layer::{InMemoryLayer, ReadableLayerWeak};
      67              : 
      68              : ///
      69              : /// LayerMap tracks what layers exist on a timeline.
      70              : ///
      71              : pub struct LayerMap {
      72              :     //
      73              :     // 'open_layer' holds the current InMemoryLayer that is accepting new
      74              :     // records. If it is None, 'next_open_layer_at' will be set instead, indicating
      75              :     // where the start LSN of the next InMemoryLayer that is to be created.
      76              :     //
      77              :     pub open_layer: Option<Arc<InMemoryLayer>>,
      78              :     pub next_open_layer_at: Option<Lsn>,
      79              : 
      80              :     ///
      81              :     /// Frozen layers, if any. Frozen layers are in-memory layers that
      82              :     /// are no longer added to, but haven't been written out to disk
      83              :     /// yet. They contain WAL older than the current 'open_layer' or
      84              :     /// 'next_open_layer_at', but newer than any historic layer.
      85              :     /// The frozen layers are in order from oldest to newest, so that
      86              :     /// the newest one is in the 'back' of the VecDeque, and the oldest
      87              :     /// in the 'front'.
      88              :     ///
      89              :     pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
      90              : 
      91              :     /// Index of the historic layers optimized for search
      92              :     historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
      93              : 
      94              :     /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
      95              :     /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
      96              :     ///
      97              :     /// NB: make sure to notify `watch_l0_deltas` on changes.
      98              :     l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
      99              : 
     100              :     /// Notifies about L0 delta layer changes, sending the current number of L0 layers.
     101              :     watch_l0_deltas: watch::Sender<usize>,
     102              : }
     103              : 
     104              : impl Default for LayerMap {
     105          240 :     fn default() -> Self {
     106          240 :         Self {
     107          240 :             open_layer: Default::default(),
     108          240 :             next_open_layer_at: Default::default(),
     109          240 :             frozen_layers: Default::default(),
     110          240 :             historic: Default::default(),
     111          240 :             l0_delta_layers: Default::default(),
     112          240 :             watch_l0_deltas: watch::channel(0).0,
     113          240 :         }
     114          240 :     }
     115              : }
     116              : 
     117              : /// The primary update API for the layer map.
     118              : ///
     119              : /// Batching historic layer insertions and removals is good for
     120              : /// performance and this struct helps us do that correctly.
     121              : #[must_use]
     122              : pub struct BatchedUpdates<'a> {
     123              :     // While we hold this exclusive reference to the layer map the type checker
     124              :     // will prevent us from accidentally reading any unflushed updates.
     125              :     layer_map: &'a mut LayerMap,
     126              : }
     127              : 
     128              : /// Provide ability to batch more updates while hiding the read
     129              : /// API so we don't accidentally read without flushing.
     130              : impl BatchedUpdates<'_> {
     131              :     ///
     132              :     /// Insert an on-disk layer.
     133              :     ///
     134              :     // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
     135         2488 :     pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
     136         2488 :         self.layer_map.insert_historic_noflush(layer_desc)
     137         2488 :     }
     138              : 
     139              :     ///
     140              :     /// Remove an on-disk layer from the map.
     141              :     ///
     142              :     /// This should be called when the corresponding file on disk has been deleted.
     143              :     ///
     144          261 :     pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
     145          261 :         self.layer_map.remove_historic_noflush(layer_desc)
     146          261 :     }
     147              : 
     148              :     // We will flush on drop anyway, but this method makes it
     149              :     // more explicit that there is some work being done.
     150              :     /// Apply all updates
     151          820 :     pub fn flush(self) {
     152              :         // Flush happens on drop
     153          820 :     }
     154              : }
     155              : 
     156              : // Ideally the flush() method should be called explicitly for more
     157              : // controlled execution. But if we forget we'd rather flush on drop
     158              : // than panic later or read without flushing.
     159              : //
     160              : // TODO maybe warn if flush hasn't explicitly been called
     161              : impl Drop for BatchedUpdates<'_> {
     162          827 :     fn drop(&mut self) {
     163          827 :         self.layer_map.flush_updates();
     164          827 :     }
     165              : }
     166              : 
     167              : /// Return value of LayerMap::search
     168              : #[derive(Eq, PartialEq, Debug, Hash)]
     169              : pub struct SearchResult {
     170              :     pub layer: ReadableLayerWeak,
     171              :     pub lsn_floor: Lsn,
     172              : }
     173              : 
     174              : /// Return value of [`LayerMap::range_search`]
     175              : ///
     176              : /// Contains a mapping from a layer description to a keyspace
     177              : /// accumulator that contains all the keys which intersect the layer
     178              : /// from the original search space.
     179              : #[derive(Debug)]
     180              : pub struct RangeSearchResult {
     181              :     pub found: HashMap<SearchResult, KeySpaceAccum>,
     182              : }
     183              : 
     184              : impl RangeSearchResult {
     185       528009 :     fn new() -> Self {
     186       528009 :         Self {
     187       528009 :             found: HashMap::new(),
     188       528009 :         }
     189       528009 :     }
     190              : 
     191       167971 :     fn map_to_in_memory_layer(
     192       167971 :         in_memory_layer: Option<InMemoryLayerDesc>,
     193       167971 :         range: Range<Key>,
     194       167971 :     ) -> RangeSearchResult {
     195       167971 :         match in_memory_layer {
     196        54707 :             Some(inmem) => {
     197        54707 :                 let search_result = SearchResult {
     198        54707 :                     lsn_floor: inmem.get_lsn_range().start,
     199        54707 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     200        54707 :                 };
     201              : 
     202        54707 :                 let mut accum = KeySpaceAccum::new();
     203        54707 :                 accum.add_range(range);
     204        54707 :                 RangeSearchResult {
     205        54707 :                     found: HashMap::from([(search_result, accum)]),
     206        54707 :                 }
     207              :             }
     208       113264 :             None => RangeSearchResult::new(),
     209              :         }
     210       167971 :     }
     211              : }
     212              : 
     213              : /// Collector for results of range search queries on the LayerMap.
     214              : /// It should be provided with two iterators for the delta and image coverage
     215              : /// that contain all the changes for layers which intersect the range.
     216              : struct RangeSearchCollector<Iter>
     217              : where
     218              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     219              : {
     220              :     in_memory_layer: Option<InMemoryLayerDesc>,
     221              :     delta_coverage: Peekable<Iter>,
     222              :     image_coverage: Peekable<Iter>,
     223              :     key_range: Range<Key>,
     224              :     end_lsn: Lsn,
     225              : 
     226              :     current_delta: Option<Arc<PersistentLayerDesc>>,
     227              :     current_image: Option<Arc<PersistentLayerDesc>>,
     228              : 
     229              :     result: RangeSearchResult,
     230              : }
     231              : 
     232              : #[derive(Debug)]
     233              : enum NextLayerType {
     234              :     Delta(i128),
     235              :     Image(i128),
     236              :     Both(i128),
     237              : }
     238              : 
     239              : impl NextLayerType {
     240       823412 :     fn next_change_at_key(&self) -> Key {
     241       823412 :         match self {
     242        80183 :             NextLayerType::Delta(at) => Key::from_i128(*at),
     243       317689 :             NextLayerType::Image(at) => Key::from_i128(*at),
     244       425540 :             NextLayerType::Both(at) => Key::from_i128(*at),
     245              :         }
     246       823412 :     }
     247              : }
     248              : 
     249              : impl<Iter> RangeSearchCollector<Iter>
     250              : where
     251              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     252              : {
     253       411204 :     fn new(
     254       411204 :         key_range: Range<Key>,
     255       411204 :         end_lsn: Lsn,
     256       411204 :         in_memory_layer: Option<InMemoryLayerDesc>,
     257       411204 :         delta_coverage: Iter,
     258       411204 :         image_coverage: Iter,
     259       411204 :     ) -> Self {
     260       411204 :         Self {
     261       411204 :             in_memory_layer,
     262       411204 :             delta_coverage: delta_coverage.peekable(),
     263       411204 :             image_coverage: image_coverage.peekable(),
     264       411204 :             key_range,
     265       411204 :             end_lsn,
     266       411204 :             current_delta: None,
     267       411204 :             current_image: None,
     268       411204 :             result: RangeSearchResult::new(),
     269       411204 :         }
     270       411204 :     }
     271              : 
     272              :     /// Run the collector. Collection is implemented via a two pointer algorithm.
     273              :     /// One pointer tracks the start of the current range and the other tracks
     274              :     /// the beginning of the next range which will overlap with the next change
     275              :     /// in coverage across both image and delta.
     276       411204 :     fn collect(mut self) -> RangeSearchResult {
     277       411204 :         let next_layer_type = self.choose_next_layer_type();
     278       407873 :         let mut current_range_start = match next_layer_type {
     279              :             None => {
     280              :                 // No changes for the range
     281         3331 :                 self.pad_range(self.key_range.clone());
     282         3331 :                 return self.result;
     283              :             }
     284       407873 :             Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
     285              :                 // Changes only after the end of the range
     286            0 :                 self.pad_range(self.key_range.clone());
     287            0 :                 return self.result;
     288              :             }
     289       407873 :             Some(layer_type) => {
     290              :                 // Changes for the range exist.
     291       407873 :                 let coverage_start = layer_type.next_change_at_key();
     292       407873 :                 let range_before = self.key_range.start..coverage_start;
     293       407873 :                 self.pad_range(range_before);
     294              : 
     295       407873 :                 self.advance(&layer_type);
     296       407873 :                 coverage_start
     297              :             }
     298              :         };
     299              : 
     300       823412 :         while current_range_start < self.key_range.end {
     301       415539 :             let next_layer_type = self.choose_next_layer_type();
     302       415539 :             match next_layer_type {
     303         7666 :                 Some(t) => {
     304         7666 :                     let current_range_end = t.next_change_at_key();
     305         7666 :                     self.add_range(current_range_start..current_range_end);
     306         7666 :                     current_range_start = current_range_end;
     307         7666 : 
     308         7666 :                     self.advance(&t);
     309         7666 :                 }
     310       407873 :                 None => {
     311       407873 :                     self.add_range(current_range_start..self.key_range.end);
     312       407873 :                     current_range_start = self.key_range.end;
     313       407873 :                 }
     314              :             }
     315              :         }
     316              : 
     317       407873 :         self.result
     318       411204 :     }
     319              : 
     320              :     /// Map a range which does not intersect any persistent layers to
     321              :     /// the in-memory layer candidate.
     322       411675 :     fn pad_range(&mut self, key_range: Range<Key>) {
     323       411675 :         if !key_range.is_empty() {
     324         4789 :             if let Some(ref inmem) = self.in_memory_layer {
     325         1639 :                 let search_result = SearchResult {
     326         1639 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem.clone()),
     327         1639 :                     lsn_floor: inmem.get_lsn_range().start,
     328         1639 :                 };
     329         1639 : 
     330         1639 :                 self.result
     331         1639 :                     .found
     332         1639 :                     .entry(search_result)
     333         1639 :                     .or_default()
     334         1639 :                     .add_range(key_range);
     335         3150 :             }
     336       406886 :         }
     337       411675 :     }
     338              : 
     339              :     /// Select the appropiate layer for the given range and update
     340              :     /// the collector.
     341       415539 :     fn add_range(&mut self, covered_range: Range<Key>) {
     342       415539 :         let selected = LayerMap::select_layer(
     343       415539 :             self.current_delta.clone(),
     344       415539 :             self.current_image.clone(),
     345       415539 :             self.in_memory_layer.clone(),
     346       415539 :             self.end_lsn,
     347              :         );
     348              : 
     349       415539 :         match selected {
     350       415068 :             Some(search_result) => self
     351       415068 :                 .result
     352       415068 :                 .found
     353       415068 :                 .entry(search_result)
     354       415068 :                 .or_default()
     355       415068 :                 .add_range(covered_range),
     356          471 :             None => self.pad_range(covered_range),
     357              :         }
     358       415539 :     }
     359              : 
     360              :     /// Move to the next coverage change.
     361       415539 :     fn advance(&mut self, layer_type: &NextLayerType) {
     362       415539 :         match layer_type {
     363        41094 :             NextLayerType::Delta(_) => {
     364        41094 :                 let (_, layer) = self.delta_coverage.next().unwrap();
     365        41094 :                 self.current_delta = layer;
     366        41094 :             }
     367       159414 :             NextLayerType::Image(_) => {
     368       159414 :                 let (_, layer) = self.image_coverage.next().unwrap();
     369       159414 :                 self.current_image = layer;
     370       159414 :             }
     371       215031 :             NextLayerType::Both(_) => {
     372       215031 :                 let (_, image_layer) = self.image_coverage.next().unwrap();
     373       215031 :                 let (_, delta_layer) = self.delta_coverage.next().unwrap();
     374       215031 : 
     375       215031 :                 self.current_image = image_layer;
     376       215031 :                 self.current_delta = delta_layer;
     377       215031 :             }
     378              :         }
     379       415539 :     }
     380              : 
     381              :     /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
     382       826743 :     fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
     383       826743 :         let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
     384       826743 :         let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
     385              : 
     386       826743 :         match (next_delta_at, next_image_at) {
     387       411204 :             (None, None) => None,
     388        38573 :             (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
     389       158978 :             (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
     390       217988 :             (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
     391          436 :                 Some(NextLayerType::Image(*next_image_at))
     392              :             }
     393       217552 :             (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
     394         2521 :                 Some(NextLayerType::Delta(*next_delta_at))
     395              :             }
     396       215031 :             (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
     397              :         }
     398       826743 :     }
     399              : }
     400              : 
     401              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     402              : pub struct InMemoryLayerDesc {
     403              :     handle: InMemoryLayerHandle,
     404              :     lsn_range: Range<Lsn>,
     405              : }
     406              : 
     407              : impl InMemoryLayerDesc {
     408       644750 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     409       644750 :         self.lsn_range.clone()
     410       644750 :     }
     411              : }
     412              : 
     413              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     414              : enum InMemoryLayerHandle {
     415              :     Open,
     416              :     Frozen(usize),
     417              : }
     418              : 
     419              : impl LayerMap {
     420              :     ///
     421              :     /// Find the latest layer (by lsn.end) that covers the given
     422              :     /// 'key', with lsn.start < 'end_lsn'.
     423              :     ///
     424              :     /// The caller of this function is the page reconstruction
     425              :     /// algorithm looking for the next relevant delta layer, or
     426              :     /// the terminal image layer. The caller will pass the lsn_floor
     427              :     /// value as end_lsn in the next call to search.
     428              :     ///
     429              :     /// If there's an image layer exactly below the given end_lsn,
     430              :     /// search should return that layer regardless if there are
     431              :     /// overlapping deltas.
     432              :     ///
     433              :     /// If the latest layer is a delta and there is an overlapping
     434              :     /// image with it below, the lsn_floor returned should be right
     435              :     /// above that image so we don't skip it in the search. Otherwise
     436              :     /// the lsn_floor returned should be the bottom of the delta layer
     437              :     /// because we should make as much progress down the lsn axis
     438              :     /// as possible. It's fine if this way we skip some overlapping
     439              :     /// deltas, because the delta we returned would contain the same
     440              :     /// wal content.
     441              :     ///
     442              :     /// TODO: This API is convoluted and inefficient. If the caller
     443              :     /// makes N search calls, we'll end up finding the same latest
     444              :     /// image layer N times. We should either cache the latest image
     445              :     /// layer result, or simplify the api to `get_latest_image` and
     446              :     /// `get_latest_delta`, and only call `get_latest_image` once.
     447              :     ///
     448        71981 :     pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
     449        71981 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     450              : 
     451        71981 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     452        71981 :             Some(version) => version,
     453              :             None => {
     454            0 :                 return in_memory_layer.map(|desc| SearchResult {
     455            0 :                     lsn_floor: desc.get_lsn_range().start,
     456            0 :                     layer: ReadableLayerWeak::InMemoryLayer(desc),
     457            0 :                 });
     458              :             }
     459              :         };
     460              : 
     461        71981 :         let latest_delta = version.delta_coverage.query(key.to_i128());
     462        71981 :         let latest_image = version.image_coverage.query(key.to_i128());
     463              : 
     464        71981 :         Self::select_layer(latest_delta, latest_image, in_memory_layer, end_lsn)
     465        71981 :     }
     466              : 
     467              :     /// Select a layer from three potential candidates (in-memory, delta and image layer).
     468              :     /// The candidates represent the first layer of each type which intersect a key range.
     469              :     ///
     470              :     /// Layer types have an in implicit priority (image > delta > in-memory). For instance,
     471              :     /// if we have the option of reading an LSN range from both an image and a delta, we
     472              :     /// should read from the image.
     473       733310 :     fn select_layer(
     474       733310 :         delta_layer: Option<Arc<PersistentLayerDesc>>,
     475       733310 :         image_layer: Option<Arc<PersistentLayerDesc>>,
     476       733310 :         in_memory_layer: Option<InMemoryLayerDesc>,
     477       733310 :         end_lsn: Lsn,
     478       733310 :     ) -> Option<SearchResult> {
     479       733310 :         assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta()));
     480       733310 :         assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta()));
     481              : 
     482       733310 :         match (delta_layer, image_layer, in_memory_layer) {
     483         5802 :             (None, None, None) => None,
     484        28867 :             (None, Some(image), None) => {
     485        28867 :                 let lsn_floor = image.get_lsn_range().start;
     486        28867 :                 Some(SearchResult {
     487        28867 :                     layer: ReadableLayerWeak::PersistentLayer(image),
     488        28867 :                     lsn_floor,
     489        28867 :                 })
     490              :             }
     491        40206 :             (Some(delta), None, None) => {
     492        40206 :                 let lsn_floor = delta.get_lsn_range().start;
     493        40206 :                 Some(SearchResult {
     494        40206 :                     layer: ReadableLayerWeak::PersistentLayer(delta),
     495        40206 :                     lsn_floor,
     496        40206 :                 })
     497              :             }
     498       238234 :             (Some(delta), Some(image), None) => {
     499       238234 :                 let img_lsn = image.get_lsn_range().start;
     500       238234 :                 let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
     501       238234 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     502       238234 :                 if image_is_newer || image_exact_match {
     503        21310 :                     Some(SearchResult {
     504        21310 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     505        21310 :                         lsn_floor: img_lsn,
     506        21310 :                     })
     507              :                 } else {
     508              :                     // If the delta overlaps with the image in the LSN dimension, do a partial
     509              :                     // up to the image layer.
     510       216924 :                     let lsn_floor =
     511       216924 :                         std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
     512       216924 :                     Some(SearchResult {
     513       216924 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     514       216924 :                         lsn_floor,
     515       216924 :                     })
     516              :                 }
     517              :             }
     518         5791 :             (None, None, Some(inmem)) => {
     519         5791 :                 let lsn_floor = inmem.get_lsn_range().start;
     520         5791 :                 Some(SearchResult {
     521         5791 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     522         5791 :                     lsn_floor,
     523         5791 :                 })
     524              :             }
     525       169628 :             (None, Some(image), Some(inmem)) => {
     526              :                 // If the in-memory layer overlaps with the image in the LSN dimension, do a partial
     527              :                 // up to the image layer.
     528       169628 :                 let img_lsn = image.get_lsn_range().start;
     529       169628 :                 let image_is_newer = image.get_lsn_range().end >= inmem.get_lsn_range().end;
     530       169628 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     531       169628 :                 if image_is_newer || image_exact_match {
     532          437 :                     Some(SearchResult {
     533          437 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     534          437 :                         lsn_floor: img_lsn,
     535          437 :                     })
     536              :                 } else {
     537       169191 :                     let lsn_floor =
     538       169191 :                         std::cmp::max(inmem.get_lsn_range().start, image.get_lsn_range().start + 1);
     539       169191 :                     Some(SearchResult {
     540       169191 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     541       169191 :                         lsn_floor,
     542       169191 :                     })
     543              :                 }
     544              :             }
     545       121899 :             (Some(delta), None, Some(inmem)) => {
     546              :                 // Overlaps between delta and in-memory layers are not a valid
     547              :                 // state, but we handle them here for completeness.
     548       121899 :                 let delta_end = delta.get_lsn_range().end;
     549       121899 :                 let delta_is_newer = delta_end >= inmem.get_lsn_range().end;
     550       121899 :                 let delta_exact_match = delta_end == end_lsn;
     551       121899 :                 if delta_is_newer || delta_exact_match {
     552            4 :                     Some(SearchResult {
     553            4 :                         lsn_floor: delta.get_lsn_range().start,
     554            4 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     555            4 :                     })
     556              :                 } else {
     557              :                     // If the in-memory layer overlaps with the delta in the LSN dimension, do a partial
     558              :                     // up to the delta layer.
     559       121895 :                     let lsn_floor =
     560       121895 :                         std::cmp::max(inmem.get_lsn_range().start, delta.get_lsn_range().end);
     561       121895 :                     Some(SearchResult {
     562       121895 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     563       121895 :                         lsn_floor,
     564       121895 :                     })
     565              :                 }
     566              :             }
     567       122883 :             (Some(delta), Some(image), Some(inmem)) => {
     568              :                 // Determine the preferred persistent layer without taking the in-memory layer
     569              :                 // into consideration.
     570       122883 :                 let persistent_res =
     571       122883 :                     Self::select_layer(Some(delta.clone()), Some(image.clone()), None, end_lsn)
     572       122883 :                         .unwrap();
     573       122883 :                 let persistent_l = match persistent_res.layer {
     574       122883 :                     ReadableLayerWeak::PersistentLayer(l) => l,
     575            0 :                     ReadableLayerWeak::InMemoryLayer(_) => unreachable!(),
     576              :                 };
     577              : 
     578              :                 // Now handle the in-memory layer overlaps.
     579       122883 :                 let inmem_res = if persistent_l.is_delta() {
     580       116912 :                     Self::select_layer(Some(persistent_l), None, Some(inmem.clone()), end_lsn)
     581       116912 :                         .unwrap()
     582              :                 } else {
     583         5971 :                     Self::select_layer(None, Some(persistent_l), Some(inmem.clone()), end_lsn)
     584         5971 :                         .unwrap()
     585              :                 };
     586              : 
     587       122883 :                 Some(SearchResult {
     588       122883 :                     layer: inmem_res.layer,
     589       122883 :                     // Use the more restrictive LSN floor
     590       122883 :                     lsn_floor: std::cmp::max(persistent_res.lsn_floor, inmem_res.lsn_floor),
     591       122883 :                 })
     592              :             }
     593              :         }
     594       733310 :     }
     595              : 
     596       579175 :     pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
     597       579175 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     598              : 
     599       579175 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     600       411204 :             Some(version) => version,
     601              :             None => {
     602       167971 :                 return RangeSearchResult::map_to_in_memory_layer(in_memory_layer, key_range);
     603              :             }
     604              :         };
     605              : 
     606       411204 :         let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
     607       411204 :         let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
     608       411204 :         let image_changes = version.image_coverage.range_overlaps(&raw_range);
     609              : 
     610       411204 :         let collector = RangeSearchCollector::new(
     611       411204 :             key_range,
     612       411204 :             end_lsn,
     613       411204 :             in_memory_layer,
     614       411204 :             delta_changes,
     615       411204 :             image_changes,
     616              :         );
     617       411204 :         collector.collect()
     618       579175 :     }
     619              : 
     620              :     /// Start a batch of updates, applied on drop
     621          827 :     pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
     622          827 :         BatchedUpdates { layer_map: self }
     623          827 :     }
     624              : 
     625              :     ///
     626              :     /// Insert an on-disk layer
     627              :     ///
     628              :     /// Helper function for BatchedUpdates::insert_historic
     629              :     ///
     630              :     /// TODO(chi): remove L generic so that we do not need to pass layer object.
     631         2493 :     pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
     632              :         // TODO: See #3869, resulting #4088, attempted fix and repro #4094
     633              : 
     634         2493 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     635          500 :             self.l0_delta_layers.push(layer_desc.clone().into());
     636          500 :             self.watch_l0_deltas
     637          500 :                 .send_replace(self.l0_delta_layers.len());
     638         1993 :         }
     639              : 
     640         2493 :         self.historic.insert(
     641         2493 :             historic_layer_coverage::LayerKey::from(&layer_desc),
     642         2493 :             layer_desc.into(),
     643              :         );
     644         2493 :     }
     645              : 
     646              :     ///
     647              :     /// Remove an on-disk layer from the map.
     648              :     ///
     649              :     /// Helper function for BatchedUpdates::remove_historic
     650              :     ///
     651          261 :     pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
     652          261 :         self.historic
     653          261 :             .remove(historic_layer_coverage::LayerKey::from(layer_desc));
     654          261 :         let layer_key = layer_desc.key();
     655          261 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     656          202 :             let len_before = self.l0_delta_layers.len();
     657          202 :             let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
     658         2717 :             l0_delta_layers.retain(|other| other.key() != layer_key);
     659          202 :             self.l0_delta_layers = l0_delta_layers;
     660          202 :             self.watch_l0_deltas
     661          202 :                 .send_replace(self.l0_delta_layers.len());
     662              :             // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
     663              :             // there's a chance that the comparison fails at runtime due to it comparing (pointer,
     664              :             // vtable) pairs.
     665          202 :             assert_eq!(
     666          202 :                 self.l0_delta_layers.len(),
     667          202 :                 len_before - 1,
     668            0 :                 "failed to locate removed historic layer from l0_delta_layers"
     669              :             );
     670           59 :         }
     671          261 :     }
     672              : 
     673              :     /// Helper function for BatchedUpdates::drop.
     674          828 :     pub(self) fn flush_updates(&mut self) {
     675          828 :         self.historic.rebuild();
     676          828 :     }
     677              : 
     678              :     /// Is there a newer image layer for given key- and LSN-range? Or a set
     679              :     /// of image layers within the specified lsn range that cover the entire
     680              :     /// specified key range?
     681              :     ///
     682              :     /// This is used for garbage collection, to determine if an old layer can
     683              :     /// be deleted.
     684         5825 :     pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
     685         5825 :         if key.is_empty() {
     686              :             // Vacuously true. There's a newer image for all 0 of the kerys in the range.
     687            0 :             return true;
     688         5825 :         }
     689              : 
     690         5825 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     691         5825 :             Some(v) => v,
     692            0 :             None => return false,
     693              :         };
     694              : 
     695         5825 :         let start = key.start.to_i128();
     696         5825 :         let end = key.end.to_i128();
     697              : 
     698         5898 :         let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
     699         5875 :             Some(layer) => layer.get_lsn_range().start >= lsn.start,
     700           23 :             None => false,
     701         5898 :         };
     702              : 
     703              :         // Check the start is covered
     704         5825 :         if !layer_covers(version.image_coverage.query(start)) {
     705         5798 :             return false;
     706           27 :         }
     707              : 
     708              :         // Check after all changes of coverage
     709           73 :         for (_, change_val) in version.image_coverage.range(start..end) {
     710           73 :             if !layer_covers(change_val) {
     711           23 :                 return false;
     712           50 :             }
     713              :         }
     714              : 
     715            4 :         true
     716         5825 :     }
     717              : 
     718         2212 :     pub fn iter_historic_layers(&self) -> impl ExactSizeIterator<Item = Arc<PersistentLayerDesc>> {
     719         2212 :         self.historic.iter()
     720         2212 :     }
     721              : 
     722              :     /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
     723       651156 :     pub(crate) fn search_in_memory_layer(&self, below: Lsn) -> Option<InMemoryLayerDesc> {
     724       651156 :         let is_below = |l: &Arc<InMemoryLayer>| {
     725       554239 :             let start_lsn = l.get_lsn_range().start;
     726       554239 :             below > start_lsn
     727       554239 :         };
     728              : 
     729       651156 :         if let Some(open) = &self.open_layer {
     730       452678 :             if is_below(open) {
     731       299612 :                 return Some(InMemoryLayerDesc {
     732       299612 :                     handle: InMemoryLayerHandle::Open,
     733       299612 :                     lsn_range: open.get_lsn_range(),
     734       299612 :                 });
     735       153066 :             }
     736       198478 :         }
     737              : 
     738       351544 :         self.frozen_layers
     739       351544 :             .iter()
     740       351544 :             .enumerate()
     741       351544 :             .rfind(|(_idx, l)| is_below(l))
     742       351544 :             .map(|(idx, l)| InMemoryLayerDesc {
     743        49697 :                 handle: InMemoryLayerHandle::Frozen(idx),
     744        49697 :                 lsn_range: l.get_lsn_range(),
     745        49697 :             })
     746       651156 :     }
     747              : 
     748       311161 :     pub(crate) fn in_memory_layer(&self, desc: &InMemoryLayerDesc) -> Arc<InMemoryLayer> {
     749       311161 :         match desc.handle {
     750       299612 :             InMemoryLayerHandle::Open => self.open_layer.as_ref().unwrap().clone(),
     751        11549 :             InMemoryLayerHandle::Frozen(idx) => self.frozen_layers[idx].clone(),
     752              :         }
     753       311161 :     }
     754              : 
     755              :     ///
     756              :     /// Divide the whole given range of keys into sub-ranges based on the latest
     757              :     /// image layer that covers each range at the specified lsn (inclusive).
     758              :     /// This is used when creating  new image layers.
     759          315 :     pub fn image_coverage(
     760          315 :         &self,
     761          315 :         key_range: &Range<Key>,
     762          315 :         lsn: Lsn,
     763          315 :     ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
     764          315 :         let version = match self.historic.get().unwrap().get_version(lsn.0) {
     765          314 :             Some(v) => v,
     766            1 :             None => return vec![],
     767              :         };
     768              : 
     769          314 :         let start = key_range.start.to_i128();
     770          314 :         let end = key_range.end.to_i128();
     771              : 
     772              :         // Initialize loop variables
     773          314 :         let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
     774          314 :         let mut current_key = start;
     775          314 :         let mut current_val = version.image_coverage.query(start);
     776              : 
     777              :         // Loop through the change events and push intervals
     778          314 :         for (change_key, change_val) in version.image_coverage.range(start..end) {
     779           24 :             let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     780           24 :             coverage.push((kr, current_val.take()));
     781           24 :             current_key = change_key;
     782           24 :             current_val.clone_from(&change_val);
     783           24 :         }
     784              : 
     785              :         // Add the final interval
     786          314 :         let kr = Key::from_i128(current_key)..Key::from_i128(end);
     787          314 :         coverage.push((kr, current_val.take()));
     788              : 
     789          314 :         coverage
     790          315 :     }
     791              : 
     792              :     /// Check if the key range resembles that of an L0 layer.
     793         4660 :     pub fn is_l0(key_range: &Range<Key>, is_delta_layer: bool) -> bool {
     794         4660 :         is_delta_layer && key_range == &(Key::MIN..Key::MAX)
     795         4660 :     }
     796              : 
     797              :     /// This function determines which layers are counted in `count_deltas`:
     798              :     /// layers that should count towards deciding whether or not to reimage
     799              :     /// a certain partition range.
     800              :     ///
     801              :     /// There are two kinds of layers we currently consider reimage-worthy:
     802              :     ///
     803              :     /// Case 1: Non-L0 layers are currently reimage-worthy by default.
     804              :     /// TODO Some of these layers are very sparse and cover the entire key
     805              :     ///      range. Replacing 256MB of data (or less!) with terabytes of
     806              :     ///      images doesn't seem wise. We need a better heuristic, possibly
     807              :     ///      based on some of these factors:
     808              :     ///      a) whether this layer has any wal in this partition range
     809              :     ///      b) the size of the layer
     810              :     ///      c) the number of images needed to cover it
     811              :     ///      d) the estimated time until we'll have to reimage over it for GC
     812              :     ///
     813              :     /// Case 2: Since L0 layers by definition cover the entire key space, we consider
     814              :     /// them reimage-worthy only when the entire key space can be covered by very few
     815              :     /// images (currently 1).
     816              :     /// TODO The optimal number should probably be slightly higher than 1, but to
     817              :     ///      implement that we need to plumb a lot more context into this function
     818              :     ///      than just the current partition_range.
     819          300 :     pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
     820              :         // Case 1
     821          300 :         if !Self::is_l0(&layer.key_range, layer.is_delta) {
     822            0 :             return true;
     823          300 :         }
     824              : 
     825              :         // Case 2
     826          300 :         if partition_range == &(Key::MIN..Key::MAX) {
     827            0 :             return true;
     828          300 :         }
     829              : 
     830          300 :         false
     831          300 :     }
     832              : 
     833              :     /// Count the height of the tallest stack of reimage-worthy deltas
     834              :     /// in this 2d region.
     835              :     ///
     836              :     /// If `limit` is provided we don't try to count above that number.
     837              :     ///
     838              :     /// This number is used to compute the largest number of deltas that
     839              :     /// we'll need to visit for any page reconstruction in this region.
     840              :     /// We use this heuristic to decide whether to create an image layer.
     841          607 :     pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
     842              :         // We get the delta coverage of the region, and for each part of the coverage
     843              :         // we recurse right underneath the delta. The recursion depth is limited by
     844              :         // the largest result this function could return, which is in practice between
     845              :         // 3 and 10 (since we usually try to create an image when the number gets larger).
     846              : 
     847          607 :         if lsn.is_empty() || key.is_empty() || limit == Some(0) {
     848            0 :             return 0;
     849          607 :         }
     850              : 
     851          607 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     852          307 :             Some(v) => v,
     853          300 :             None => return 0,
     854              :         };
     855              : 
     856          307 :         let start = key.start.to_i128();
     857          307 :         let end = key.end.to_i128();
     858              : 
     859              :         // Initialize loop variables
     860          307 :         let mut max_stacked_deltas = 0;
     861          307 :         let mut current_key = start;
     862          307 :         let mut current_val = version.delta_coverage.query(start);
     863              : 
     864              :         // Loop through the delta coverage and recurse on each part
     865          307 :         for (change_key, change_val) in version.delta_coverage.range(start..end) {
     866              :             // If there's a relevant delta in this part, add 1 and recurse down
     867           50 :             if let Some(val) = &current_val {
     868           50 :                 if val.get_lsn_range().end > lsn.start {
     869           50 :                     let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     870           50 :                     let lr = lsn.start..val.get_lsn_range().start;
     871           50 :                     if !kr.is_empty() {
     872            0 :                         let base_count = Self::is_reimage_worthy(val, key) as usize;
     873            0 :                         let new_limit = limit.map(|l| l - base_count);
     874            0 :                         let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     875            0 :                         max_stacked_deltas = std::cmp::max(
     876            0 :                             max_stacked_deltas,
     877            0 :                             base_count + max_stacked_deltas_underneath,
     878            0 :                         );
     879           50 :                     }
     880            0 :                 }
     881            0 :             }
     882              : 
     883           50 :             current_key = change_key;
     884           50 :             current_val.clone_from(&change_val);
     885              :         }
     886              : 
     887              :         // Consider the last part
     888          307 :         if let Some(val) = &current_val {
     889          300 :             if val.get_lsn_range().end > lsn.start {
     890          300 :                 let kr = Key::from_i128(current_key)..Key::from_i128(end);
     891          300 :                 let lr = lsn.start..val.get_lsn_range().start;
     892              : 
     893          300 :                 if !kr.is_empty() {
     894          300 :                     let base_count = Self::is_reimage_worthy(val, key) as usize;
     895          300 :                     let new_limit = limit.map(|l| l - base_count);
     896          300 :                     let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     897          300 :                     max_stacked_deltas = std::cmp::max(
     898          300 :                         max_stacked_deltas,
     899          300 :                         base_count + max_stacked_deltas_underneath,
     900          300 :                     );
     901            0 :                 }
     902            0 :             }
     903            7 :         }
     904              : 
     905          307 :         max_stacked_deltas
     906          607 :     }
     907              : 
     908              :     /* BEGIN_HADRON */
     909              :     /**
     910              :      * Compute the image consistent LSN, the largest LSN below which all pages have been redone successfully.
     911              :      * It works by first finding the latest image layers and store them into a map. Then for each delta layer,
     912              :      * find all overlapping image layers in order to potentially increase the image LSN in case there are gaps
     913              :      * (e.g., if an image is created at LSN 100 but the delta layer spans LSN [150, 200], then we can increase
     914              :      * image LSN to 150 because there is no WAL record in between).
     915              :      * Finally, the image consistent LSN is computed by taking the minimum of all image layers.
     916              :      */
     917            8 :     pub fn compute_image_consistent_lsn(&self, disk_consistent_lsn: Lsn) -> Lsn {
     918              :         struct ImageLayerInfo {
     919              :             // creation LSN of the image layer
     920              :             image_lsn: Lsn,
     921              :             // the current minimum LSN of newer delta layers with overlapping key ranges
     922              :             min_delta_lsn: Lsn,
     923              :         }
     924            8 :         let started_at = Instant::now();
     925              : 
     926            8 :         let min_l0_deltas_lsn = {
     927            8 :             let l0_deltas = self.level0_deltas();
     928            8 :             l0_deltas
     929            8 :                 .iter()
     930            8 :                 .map(|layer| layer.get_lsn_range().start)
     931            8 :                 .min()
     932            8 :                 .unwrap_or(disk_consistent_lsn)
     933              :         };
     934            8 :         let global_key_range = Key::MIN..Key::MAX;
     935              : 
     936              :         // step 1: collect all most recent image layers into a map
     937              :         // map: end key to image_layer_info
     938            8 :         let mut image_map: BTreeMap<Key, ImageLayerInfo> = BTreeMap::new();
     939           31 :         for (img_range, img) in self.image_coverage(&global_key_range, disk_consistent_lsn) {
     940           31 :             let img_lsn = img.map(|layer| layer.get_lsn_range().end).unwrap_or(Lsn(0));
     941           31 :             image_map.insert(
     942           31 :                 img_range.end,
     943           31 :                 ImageLayerInfo {
     944           31 :                     image_lsn: img_lsn,
     945           31 :                     min_delta_lsn: min_l0_deltas_lsn,
     946           31 :                 },
     947              :             );
     948              :         }
     949              : 
     950              :         // step 2: go through all delta layers, and update the image layer info with overlapping
     951              :         // key ranges
     952           52 :         for layer in self.historic.iter() {
     953           52 :             if !layer.is_delta {
     954           18 :                 continue;
     955           34 :             }
     956           34 :             let delta_key_range = layer.get_key_range();
     957           34 :             let delta_lsn_range = layer.get_lsn_range();
     958           99 :             for (img_end_key, img_info) in image_map.range_mut(delta_key_range.start..Key::MAX) {
     959           99 :                 debug_assert!(img_end_key >= &delta_key_range.start);
     960           99 :                 if delta_lsn_range.end > img_info.image_lsn {
     961           91 :                     // the delta layer includes WAL records after the image
     962           91 :                     // it's possibel that the delta layer's start LSN < image LSN, which will be simply ignored by step 3
     963           91 :                     img_info.min_delta_lsn =
     964           91 :                         std::cmp::min(img_info.min_delta_lsn, delta_lsn_range.start);
     965           91 :                 }
     966           99 :                 if img_end_key >= &delta_key_range.end {
     967              :                     // we have fully processed all overlapping image layers
     968           34 :                     break;
     969           65 :                 }
     970              :             }
     971              :         }
     972              : 
     973              :         // step 3, go through all image layers and find the image consistent LSN
     974            8 :         let mut img_consistent_lsn = min_l0_deltas_lsn.checked_sub(Lsn(1)).unwrap();
     975            8 :         let mut prev_key = Key::MIN;
     976           39 :         for (img_key, img_info) in image_map {
     977           31 :             tracing::debug!(
     978            0 :                 "Image layer {:?}:{} has min delta lsn {}",
     979            0 :                 Range {
     980            0 :                     start: prev_key,
     981            0 :                     end: img_key,
     982            0 :                 },
     983              :                 img_info.image_lsn,
     984              :                 img_info.min_delta_lsn,
     985              :             );
     986           31 :             let image_lsn = std::cmp::max(
     987           31 :                 img_info.image_lsn,
     988           31 :                 img_info.min_delta_lsn.checked_sub(Lsn(1)).unwrap_or(Lsn(0)),
     989              :             );
     990           31 :             img_consistent_lsn = std::cmp::min(img_consistent_lsn, image_lsn);
     991           31 :             prev_key = img_key;
     992              :         }
     993            8 :         tracing::info!(
     994            0 :             "computed image_consistent_lsn {} for disk_consistent_lsn {} in {}ms. Processed {} layrs in total.",
     995              :             img_consistent_lsn,
     996              :             disk_consistent_lsn,
     997            0 :             started_at.elapsed().as_millis(),
     998            0 :             self.historic.len()
     999              :         );
    1000            8 :         img_consistent_lsn
    1001            8 :     }
    1002              : 
    1003              :     /* END_HADRON */
    1004              : 
    1005              :     /// Return all L0 delta layers
    1006          854 :     pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
    1007          854 :         &self.l0_delta_layers
    1008          854 :     }
    1009              : 
    1010              :     /// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers.
    1011          232 :     pub fn watch_level0_deltas(&self) -> watch::Receiver<usize> {
    1012          232 :         self.watch_l0_deltas.subscribe()
    1013          232 :     }
    1014              : 
    1015              :     /// debugging function to print out the contents of the layer map
    1016              :     #[allow(unused)]
    1017            1 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
    1018            1 :         println!("Begin dump LayerMap");
    1019              : 
    1020            1 :         println!("open_layer:");
    1021            1 :         if let Some(open_layer) = &self.open_layer {
    1022            0 :             open_layer.dump(verbose, ctx).await?;
    1023            1 :         }
    1024              : 
    1025            1 :         println!("frozen_layers:");
    1026            1 :         for frozen_layer in self.frozen_layers.iter() {
    1027            0 :             frozen_layer.dump(verbose, ctx).await?;
    1028              :         }
    1029              : 
    1030            1 :         println!("historic_layers:");
    1031            6 :         for desc in self.iter_historic_layers() {
    1032            6 :             desc.dump();
    1033            6 :         }
    1034            1 :         println!("End dump LayerMap");
    1035            1 :         Ok(())
    1036            1 :     }
    1037              : 
    1038              :     /// `read_points` represent the tip of a timeline and any branch points, i.e. the places
    1039              :     /// where we expect to serve reads.
    1040              :     ///
    1041              :     /// This function is O(N) and should be called infrequently.  The caller is responsible for
    1042              :     /// looking up and updating the Layer objects for these layer descriptors.
    1043          125 :     pub fn get_visibility(
    1044          125 :         &self,
    1045          125 :         mut read_points: Vec<Lsn>,
    1046          125 :     ) -> (
    1047          125 :         Vec<(Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
    1048          125 :         KeySpace,
    1049          125 :     ) {
    1050              :         // This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
    1051              :         // KeySpace is intended to be composed statically and iterated over.
    1052              :         struct KeyShadow {
    1053              :             // Map of range start to range end
    1054              :             inner: RangeSetBlaze<i128>,
    1055              :         }
    1056              : 
    1057              :         impl KeyShadow {
    1058          125 :             fn new() -> Self {
    1059          125 :                 Self {
    1060          125 :                     inner: Default::default(),
    1061          125 :                 }
    1062          125 :             }
    1063              : 
    1064          810 :             fn contains(&self, range: Range<Key>) -> bool {
    1065          810 :                 let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
    1066          810 :                 self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
    1067          810 :                     CheckSortedDisjoint::from([range_incl]),
    1068          810 :                 ))
    1069          810 :             }
    1070              : 
    1071              :             /// Add the input range to the keys covered by self.
    1072              :             ///
    1073              :             /// Return true if inserting this range covered some keys that were previously not covered
    1074          957 :             fn cover(&mut self, insert: Range<Key>) -> bool {
    1075          957 :                 let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
    1076          957 :                 self.inner.ranges_insert(range_incl)
    1077          957 :             }
    1078              : 
    1079          128 :             fn reset(&mut self) {
    1080          128 :                 self.inner = Default::default();
    1081          128 :             }
    1082              : 
    1083          125 :             fn to_keyspace(&self) -> KeySpace {
    1084          125 :                 let mut accum = KeySpaceAccum::new();
    1085          127 :                 for range_incl in self.inner.ranges() {
    1086          127 :                     let range = Range {
    1087          127 :                         start: Key::from_i128(*range_incl.start()),
    1088          127 :                         end: Key::from_i128(range_incl.end() + 1),
    1089          127 :                     };
    1090          127 :                     accum.add_range(range)
    1091              :                 }
    1092              : 
    1093          125 :                 accum.to_keyspace()
    1094          125 :             }
    1095              :         }
    1096              : 
    1097              :         // The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
    1098              :         // and a ReadPoint
    1099          125 :         read_points.sort_by_key(|rp| rp.0);
    1100          125 :         let mut shadow = KeyShadow::new();
    1101              : 
    1102              :         // We will interleave all our read points and layers into a sorted collection
    1103              :         enum Item {
    1104              :             ReadPoint { lsn: Lsn },
    1105              :             Layer(Arc<PersistentLayerDesc>),
    1106              :         }
    1107              : 
    1108          125 :         let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
    1109          125 :         items.extend(self.iter_historic_layers().map(Item::Layer));
    1110          125 :         items.extend(
    1111          125 :             read_points
    1112          125 :                 .into_iter()
    1113          128 :                 .map(|rp| Item::ReadPoint { lsn: rp }),
    1114              :         );
    1115              : 
    1116              :         // Ordering: we want to iterate like this:
    1117              :         // 1. Highest LSNs first
    1118              :         // 2. Consider images before deltas if they end at the same LSNs (images cover deltas)
    1119              :         // 3. Consider ReadPoints before image layers if they're at the same LSN (readpoints make that image visible)
    1120        31936 :         items.sort_by_key(|item| {
    1121        31936 :             std::cmp::Reverse(match item {
    1122        31701 :                 Item::Layer(layer) => {
    1123        31701 :                     if layer.is_delta() {
    1124        14950 :                         (Lsn(layer.get_lsn_range().end.0 - 1), 0)
    1125              :                     } else {
    1126        16751 :                         (layer.image_layer_lsn(), 1)
    1127              :                     }
    1128              :                 }
    1129          235 :                 Item::ReadPoint { lsn } => (*lsn, 2),
    1130              :             })
    1131        31936 :         });
    1132              : 
    1133          125 :         let mut results = Vec::with_capacity(self.historic.len());
    1134              : 
    1135          125 :         let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
    1136              : 
    1137         2020 :         for item in items {
    1138         1895 :             let (reached_lsn, is_readpoint) = match &item {
    1139          128 :                 Item::ReadPoint { lsn } => (lsn, true),
    1140         1767 :                 Item::Layer(layer) => (&layer.lsn_range.start, false),
    1141              :             };
    1142         1895 :             maybe_covered_deltas.retain(|d| {
    1143           50 :                 if *reached_lsn >= d.lsn_range.start && is_readpoint {
    1144              :                     // We encountered a readpoint within the delta layer: it is visible
    1145              : 
    1146            1 :                     results.push((d.clone(), LayerVisibilityHint::Visible));
    1147            1 :                     false
    1148           49 :                 } else if *reached_lsn < d.lsn_range.start {
    1149              :                     // We passed the layer's range without encountering a read point: it is not visible
    1150           19 :                     results.push((d.clone(), LayerVisibilityHint::Covered));
    1151           19 :                     false
    1152              :                 } else {
    1153              :                     // We're still in the delta layer: continue iterating
    1154           30 :                     true
    1155              :                 }
    1156           50 :             });
    1157              : 
    1158         1895 :             match item {
    1159          128 :                 Item::ReadPoint { lsn: _lsn } => {
    1160          128 :                     // TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
    1161          128 :                     // to assume that the whole key range is visible at the branch point.
    1162          128 :                     shadow.reset();
    1163          128 :                 }
    1164         1767 :                 Item::Layer(layer) => {
    1165         1767 :                     let visibility = if layer.is_delta() {
    1166          810 :                         if shadow.contains(layer.get_key_range()) {
    1167              :                             // If a layer isn't visible based on current state, we must defer deciding whether
    1168              :                             // it is truly not visible until we have advanced past the delta's range: we might
    1169              :                             // encounter another branch point within this delta layer's LSN range.
    1170           22 :                             maybe_covered_deltas.push(layer);
    1171           22 :                             continue;
    1172              :                         } else {
    1173          788 :                             LayerVisibilityHint::Visible
    1174              :                         }
    1175              :                     } else {
    1176          957 :                         let modified = shadow.cover(layer.get_key_range());
    1177          957 :                         if modified {
    1178              :                             // An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
    1179          951 :                             LayerVisibilityHint::Visible
    1180              :                         } else {
    1181              :                             // An image layer in a region that was already covered
    1182            6 :                             LayerVisibilityHint::Covered
    1183              :                         }
    1184              :                     };
    1185              : 
    1186         1745 :                     results.push((layer, visibility));
    1187              :                 }
    1188              :             }
    1189              :         }
    1190              : 
    1191              :         // Drain any remaining maybe_covered deltas
    1192          125 :         results.extend(
    1193          125 :             maybe_covered_deltas
    1194          125 :                 .into_iter()
    1195          125 :                 .map(|d| (d, LayerVisibilityHint::Covered)),
    1196              :         );
    1197              : 
    1198          125 :         (results, shadow.to_keyspace())
    1199          125 :     }
    1200              : }
    1201              : 
    1202              : #[cfg(test)]
    1203              : mod tests {
    1204              :     use std::collections::HashMap;
    1205              :     use std::path::PathBuf;
    1206              : 
    1207              :     use crate::{
    1208              :         DEFAULT_PG_VERSION,
    1209              :         tenant::{harness::TenantHarness, storage_layer::LayerName},
    1210              :     };
    1211              :     use pageserver_api::key::DBDIR_KEY;
    1212              :     use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
    1213              :     use tokio_util::sync::CancellationToken;
    1214              :     use utils::id::{TenantId, TimelineId};
    1215              :     use utils::shard::TenantShardId;
    1216              : 
    1217              :     use super::*;
    1218              :     use crate::tenant::IndexPart;
    1219              : 
    1220              :     #[derive(Clone)]
    1221              :     struct LayerDesc {
    1222              :         key_range: Range<Key>,
    1223              :         lsn_range: Range<Lsn>,
    1224              :         is_delta: bool,
    1225              :     }
    1226              : 
    1227            1 :     fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
    1228            1 :         let mut layer_map = LayerMap::default();
    1229              : 
    1230            6 :         for layer in layers {
    1231            5 :             layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
    1232            5 :                 layer.key_range,
    1233            5 :                 layer.lsn_range,
    1234            5 :                 layer.is_delta,
    1235            5 :             ));
    1236            5 :         }
    1237              : 
    1238            1 :         layer_map.flush_updates();
    1239            1 :         layer_map
    1240            1 :     }
    1241              : 
    1242         3541 :     fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
    1243         3541 :         let lhs: HashMap<SearchResult, KeySpace> = lhs
    1244         3541 :             .found
    1245         3541 :             .into_iter()
    1246         6820 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1247         3541 :             .collect();
    1248         3541 :         let rhs: HashMap<SearchResult, KeySpace> = rhs
    1249         3541 :             .found
    1250         3541 :             .into_iter()
    1251         6820 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1252         3541 :             .collect();
    1253              : 
    1254         3541 :         assert_eq!(lhs, rhs);
    1255         3541 :     }
    1256              : 
    1257              :     #[cfg(test)]
    1258         3540 :     fn brute_force_range_search(
    1259         3540 :         layer_map: &LayerMap,
    1260         3540 :         key_range: Range<Key>,
    1261         3540 :         end_lsn: Lsn,
    1262         3540 :     ) -> RangeSearchResult {
    1263         3540 :         let mut range_search_result = RangeSearchResult::new();
    1264              : 
    1265         3540 :         let mut key = key_range.start;
    1266        75520 :         while key != key_range.end {
    1267        71980 :             let res = layer_map.search(key, end_lsn);
    1268        71980 :             if let Some(res) = res {
    1269        66650 :                 range_search_result
    1270        66650 :                     .found
    1271        66650 :                     .entry(res)
    1272        66650 :                     .or_default()
    1273        66650 :                     .add_key(key);
    1274        66650 :             }
    1275              : 
    1276        71980 :             key = key.next();
    1277              :         }
    1278              : 
    1279         3540 :         range_search_result
    1280         3540 :     }
    1281              : 
    1282              :     #[test]
    1283            1 :     fn ranged_search_on_empty_layer_map() {
    1284            1 :         let layer_map = LayerMap::default();
    1285            1 :         let range = Key::from_i128(100)..Key::from_i128(200);
    1286              : 
    1287            1 :         let res = layer_map.range_search(range.clone(), Lsn(100));
    1288            1 :         assert_range_search_result_eq(res, RangeSearchResult::new());
    1289            1 :     }
    1290              : 
    1291              :     #[tokio::test]
    1292            1 :     async fn ranged_search() {
    1293            1 :         let harness = TenantHarness::create("ranged_search").await.unwrap();
    1294            1 :         let (tenant, ctx) = harness.load().await;
    1295            1 :         let cancel = CancellationToken::new();
    1296            1 :         let timeline_id = TimelineId::generate();
    1297              :         // Create the timeline such that the in-memory layers can be written
    1298              :         // to the timeline directory.
    1299            1 :         tenant
    1300            1 :             .create_test_timeline(timeline_id, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1301            1 :             .await
    1302            1 :             .unwrap();
    1303              : 
    1304            1 :         let gate = utils::sync::gate::Gate::default();
    1305            2 :         let add_in_memory_layer = async |layer_map: &mut LayerMap, lsn_range: Range<Lsn>| {
    1306            1 :             let layer = InMemoryLayer::create(
    1307            1 :                 harness.conf,
    1308            1 :                 timeline_id,
    1309            1 :                 harness.tenant_shard_id,
    1310            1 :                 lsn_range.start,
    1311            1 :                 &gate,
    1312            1 :                 &cancel,
    1313            1 :                 &ctx,
    1314            1 :             )
    1315            1 :             .await
    1316            1 :             .unwrap();
    1317              : 
    1318            1 :             layer.freeze(lsn_range.end).await;
    1319              : 
    1320            1 :             layer_map.frozen_layers.push_back(Arc::new(layer));
    1321            2 :         };
    1322              : 
    1323            1 :         let in_memory_layer_configurations = [
    1324            1 :             vec![],
    1325            1 :             // Overlaps with the top-most image
    1326            1 :             vec![Lsn(35)..Lsn(50)],
    1327            1 :         ];
    1328              : 
    1329            1 :         let layers = vec![
    1330            1 :             LayerDesc {
    1331            1 :                 key_range: Key::from_i128(15)..Key::from_i128(50),
    1332            1 :                 lsn_range: Lsn(5)..Lsn(6),
    1333            1 :                 is_delta: false,
    1334            1 :             },
    1335            1 :             LayerDesc {
    1336            1 :                 key_range: Key::from_i128(10)..Key::from_i128(20),
    1337            1 :                 lsn_range: Lsn(5)..Lsn(20),
    1338            1 :                 is_delta: true,
    1339            1 :             },
    1340            1 :             LayerDesc {
    1341            1 :                 key_range: Key::from_i128(15)..Key::from_i128(25),
    1342            1 :                 lsn_range: Lsn(20)..Lsn(30),
    1343            1 :                 is_delta: true,
    1344            1 :             },
    1345            1 :             LayerDesc {
    1346            1 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1347            1 :                 lsn_range: Lsn(25)..Lsn(35),
    1348            1 :                 is_delta: true,
    1349            1 :             },
    1350            1 :             LayerDesc {
    1351            1 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1352            1 :                 lsn_range: Lsn(40)..Lsn(41),
    1353            1 :                 is_delta: false,
    1354            1 :             },
    1355              :         ];
    1356              : 
    1357            1 :         let mut layer_map = create_layer_map(layers.clone());
    1358            3 :         for in_memory_layers in in_memory_layer_configurations {
    1359            3 :             for in_mem_layer_range in in_memory_layers {
    1360            1 :                 add_in_memory_layer(&mut layer_map, in_mem_layer_range).await;
    1361            1 :             }
    1362            1 : 
    1363          122 :             for start in 0..60 {
    1364         3540 :                 for end in (start + 1)..60 {
    1365         3540 :                     let range = Key::from_i128(start)..Key::from_i128(end);
    1366         3540 :                     let result = layer_map.range_search(range.clone(), Lsn(100));
    1367         3540 :                     let expected = brute_force_range_search(&layer_map, range, Lsn(100));
    1368         3540 : 
    1369         3540 :                     eprintln!("{start}..{end}: {result:?}");
    1370         3540 : 
    1371         3540 :                     assert_range_search_result_eq(result, expected);
    1372         3540 :                 }
    1373            1 :             }
    1374            1 :         }
    1375            1 :     }
    1376              : 
    1377              :     #[test]
    1378            1 :     fn layer_visibility_basic() {
    1379              :         // A simple synthetic input, as a smoke test.
    1380            1 :         let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
    1381            1 :         let timeline_id = TimelineId::generate();
    1382            1 :         let mut layer_map = LayerMap::default();
    1383            1 :         let mut updates = layer_map.batch_update();
    1384              : 
    1385              :         const FAKE_LAYER_SIZE: u64 = 1024;
    1386              : 
    1387            1 :         let inject_delta = |updates: &mut BatchedUpdates,
    1388              :                             key_start: i128,
    1389              :                             key_end: i128,
    1390              :                             lsn_start: u64,
    1391            7 :                             lsn_end: u64| {
    1392            7 :             let desc = PersistentLayerDesc::new_delta(
    1393            7 :                 tenant_shard_id,
    1394            7 :                 timeline_id,
    1395            7 :                 Range {
    1396            7 :                     start: Key::from_i128(key_start),
    1397            7 :                     end: Key::from_i128(key_end),
    1398            7 :                 },
    1399            7 :                 Range {
    1400            7 :                     start: Lsn(lsn_start),
    1401            7 :                     end: Lsn(lsn_end),
    1402            7 :                 },
    1403              :                 1024,
    1404              :             );
    1405            7 :             updates.insert_historic(desc.clone());
    1406            7 :             desc
    1407            7 :         };
    1408              : 
    1409            1 :         let inject_image =
    1410            6 :             |updates: &mut BatchedUpdates, key_start: i128, key_end: i128, lsn: u64| {
    1411            6 :                 let desc = PersistentLayerDesc::new_img(
    1412            6 :                     tenant_shard_id,
    1413            6 :                     timeline_id,
    1414            6 :                     Range {
    1415            6 :                         start: Key::from_i128(key_start),
    1416            6 :                         end: Key::from_i128(key_end),
    1417            6 :                     },
    1418            6 :                     Lsn(lsn),
    1419              :                     FAKE_LAYER_SIZE,
    1420              :                 );
    1421            6 :                 updates.insert_historic(desc.clone());
    1422            6 :                 desc
    1423            6 :             };
    1424              : 
    1425              :         //
    1426              :         // Construct our scenario: the following lines go in backward-LSN order, constructing the various scenarios
    1427              :         // we expect to handle.  You can follow these examples through in the same order as they would be processed
    1428              :         // by the function under test.
    1429              :         //
    1430              : 
    1431            1 :         let mut read_points = vec![Lsn(1000)];
    1432              : 
    1433              :         // A delta ahead of any image layer
    1434            1 :         let ahead_layer = inject_delta(&mut updates, 10, 20, 101, 110);
    1435              : 
    1436              :         // An image layer is visible and covers some layers beneath itself
    1437            1 :         let visible_covering_img = inject_image(&mut updates, 5, 25, 99);
    1438              : 
    1439              :         // A delta layer covered by the image layer: should be covered
    1440            1 :         let covered_delta = inject_delta(&mut updates, 10, 20, 90, 100);
    1441              : 
    1442              :         // A delta layer partially covered by an image layer: should be visible
    1443            1 :         let partially_covered_delta = inject_delta(&mut updates, 1, 7, 90, 100);
    1444              : 
    1445              :         // A delta layer not covered by an image layer: should be visible
    1446            1 :         let not_covered_delta = inject_delta(&mut updates, 1, 4, 90, 100);
    1447              : 
    1448              :         // An image layer covered by the image layer above: should be covered
    1449            1 :         let covered_image = inject_image(&mut updates, 10, 20, 89);
    1450              : 
    1451              :         // An image layer partially covered by an image layer: should be visible
    1452            1 :         let partially_covered_image = inject_image(&mut updates, 1, 7, 89);
    1453              : 
    1454              :         // An image layer not covered by an image layer: should be visible
    1455            1 :         let not_covered_image = inject_image(&mut updates, 1, 4, 89);
    1456              : 
    1457              :         // A read point: this will make subsequent layers below here visible, even if there are
    1458              :         // more recent layers covering them.
    1459            1 :         read_points.push(Lsn(80));
    1460              : 
    1461              :         // A delta layer covered by an earlier image layer, but visible to a readpoint below that covering layer
    1462            1 :         let covered_delta_below_read_point = inject_delta(&mut updates, 10, 20, 70, 79);
    1463              : 
    1464              :         // A delta layer whose end LSN is covered, but where a read point is present partway through its LSN range:
    1465              :         // the read point should make it visible, even though its end LSN is covered
    1466            1 :         let covering_img_between_read_points = inject_image(&mut updates, 10, 20, 69);
    1467            1 :         let covered_delta_between_read_points = inject_delta(&mut updates, 10, 15, 67, 69);
    1468            1 :         read_points.push(Lsn(65));
    1469            1 :         let covered_delta_intersects_read_point = inject_delta(&mut updates, 15, 20, 60, 69);
    1470              : 
    1471            1 :         let visible_img_after_last_read_point = inject_image(&mut updates, 10, 20, 65);
    1472              : 
    1473            1 :         updates.flush();
    1474              : 
    1475            1 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1476            1 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1477              : 
    1478            1 :         assert_eq!(
    1479            1 :             layer_visibilities.get(&ahead_layer),
    1480              :             Some(&LayerVisibilityHint::Visible)
    1481              :         );
    1482            1 :         assert_eq!(
    1483            1 :             layer_visibilities.get(&visible_covering_img),
    1484              :             Some(&LayerVisibilityHint::Visible)
    1485              :         );
    1486            1 :         assert_eq!(
    1487            1 :             layer_visibilities.get(&covered_delta),
    1488              :             Some(&LayerVisibilityHint::Covered)
    1489              :         );
    1490            1 :         assert_eq!(
    1491            1 :             layer_visibilities.get(&partially_covered_delta),
    1492              :             Some(&LayerVisibilityHint::Visible)
    1493              :         );
    1494            1 :         assert_eq!(
    1495            1 :             layer_visibilities.get(&not_covered_delta),
    1496              :             Some(&LayerVisibilityHint::Visible)
    1497              :         );
    1498            1 :         assert_eq!(
    1499            1 :             layer_visibilities.get(&covered_image),
    1500              :             Some(&LayerVisibilityHint::Covered)
    1501              :         );
    1502            1 :         assert_eq!(
    1503            1 :             layer_visibilities.get(&partially_covered_image),
    1504              :             Some(&LayerVisibilityHint::Visible)
    1505              :         );
    1506            1 :         assert_eq!(
    1507            1 :             layer_visibilities.get(&not_covered_image),
    1508              :             Some(&LayerVisibilityHint::Visible)
    1509              :         );
    1510            1 :         assert_eq!(
    1511            1 :             layer_visibilities.get(&covered_delta_below_read_point),
    1512              :             Some(&LayerVisibilityHint::Visible)
    1513              :         );
    1514            1 :         assert_eq!(
    1515            1 :             layer_visibilities.get(&covering_img_between_read_points),
    1516              :             Some(&LayerVisibilityHint::Visible)
    1517              :         );
    1518            1 :         assert_eq!(
    1519            1 :             layer_visibilities.get(&covered_delta_between_read_points),
    1520              :             Some(&LayerVisibilityHint::Covered)
    1521              :         );
    1522            1 :         assert_eq!(
    1523            1 :             layer_visibilities.get(&covered_delta_intersects_read_point),
    1524              :             Some(&LayerVisibilityHint::Visible)
    1525              :         );
    1526            1 :         assert_eq!(
    1527            1 :             layer_visibilities.get(&visible_img_after_last_read_point),
    1528              :             Some(&LayerVisibilityHint::Visible)
    1529              :         );
    1530              : 
    1531              :         // Shadow should include all the images below the last read point
    1532            1 :         let expected_shadow = KeySpace {
    1533            1 :             ranges: vec![Key::from_i128(10)..Key::from_i128(20)],
    1534            1 :         };
    1535            1 :         assert_eq!(shadow, expected_shadow);
    1536            1 :     }
    1537              : 
    1538            1 :     fn fixture_path(relative: &str) -> PathBuf {
    1539            1 :         PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
    1540            1 :     }
    1541              : 
    1542              :     #[test]
    1543            1 :     fn layer_visibility_realistic() {
    1544              :         // Load a large example layermap
    1545            1 :         let index_raw = std::fs::read_to_string(fixture_path(
    1546            1 :             "test_data/indices/mixed_workload/index_part.json",
    1547              :         ))
    1548            1 :         .unwrap();
    1549            1 :         let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
    1550              : 
    1551            1 :         let tenant_id = TenantId::generate();
    1552            1 :         let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1553            1 :         let timeline_id = TimelineId::generate();
    1554              : 
    1555            1 :         let mut layer_map = LayerMap::default();
    1556            1 :         let mut updates = layer_map.batch_update();
    1557         1565 :         for (layer_name, layer_metadata) in index.layer_metadata {
    1558         1564 :             let layer_desc = match layer_name {
    1559          802 :                 LayerName::Image(layer_name) => PersistentLayerDesc {
    1560          802 :                     key_range: layer_name.key_range.clone(),
    1561          802 :                     lsn_range: layer_name.lsn_as_range(),
    1562          802 :                     tenant_shard_id,
    1563          802 :                     timeline_id,
    1564          802 :                     is_delta: false,
    1565          802 :                     file_size: layer_metadata.file_size,
    1566          802 :                 },
    1567          762 :                 LayerName::Delta(layer_name) => PersistentLayerDesc {
    1568          762 :                     key_range: layer_name.key_range,
    1569          762 :                     lsn_range: layer_name.lsn_range,
    1570          762 :                     tenant_shard_id,
    1571          762 :                     timeline_id,
    1572          762 :                     is_delta: true,
    1573          762 :                     file_size: layer_metadata.file_size,
    1574          762 :                 },
    1575              :             };
    1576         1564 :             updates.insert_historic(layer_desc);
    1577              :         }
    1578            1 :         updates.flush();
    1579              : 
    1580            1 :         let read_points = vec![index.metadata.disk_consistent_lsn()];
    1581            1 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1582         1565 :         for (layer_desc, visibility) in &layer_visibilities {
    1583         1564 :             tracing::info!("{layer_desc:?}: {visibility:?}");
    1584         1564 :             eprintln!("{layer_desc:?}: {visibility:?}");
    1585              :         }
    1586              : 
    1587              :         // The shadow should be non-empty, since there were some image layers
    1588            1 :         assert!(!shadow.ranges.is_empty());
    1589              : 
    1590              :         // At least some layers should be marked covered
    1591            1 :         assert!(
    1592            1 :             layer_visibilities
    1593            1 :                 .iter()
    1594           19 :                 .any(|i| matches!(i.1, LayerVisibilityHint::Covered))
    1595              :         );
    1596              : 
    1597            1 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1598              : 
    1599              :         // Brute force validation: a layer should be marked covered if and only if there are image layers above it in LSN order which cover it
    1600         1565 :         for (layer_desc, visible) in &layer_visibilities {
    1601         1564 :             let mut coverage = KeySpaceRandomAccum::new();
    1602         1564 :             let mut covered_by = Vec::new();
    1603              : 
    1604      2446096 :             for other_layer in layer_map.iter_historic_layers() {
    1605      2446096 :                 if &other_layer == layer_desc {
    1606         1564 :                     continue;
    1607      2444532 :                 }
    1608      2444532 :                 if !other_layer.is_delta()
    1609      1253526 :                     && other_layer.image_layer_lsn() >= Lsn(layer_desc.get_lsn_range().end.0 - 1)
    1610       631756 :                     && other_layer.key_range.start <= layer_desc.key_range.end
    1611       232652 :                     && layer_desc.key_range.start <= other_layer.key_range.end
    1612        42823 :                 {
    1613        42823 :                     coverage.add_range(other_layer.get_key_range());
    1614        42823 :                     covered_by.push((*other_layer).clone());
    1615      2401709 :                 }
    1616              :             }
    1617         1564 :             let coverage = coverage.to_keyspace();
    1618              : 
    1619         1564 :             let expect_visible = if coverage.ranges.len() == 1
    1620          378 :                 && coverage.contains(&layer_desc.key_range.start)
    1621           17 :                 && coverage.contains(&Key::from_i128(layer_desc.key_range.end.to_i128() - 1))
    1622              :             {
    1623           10 :                 LayerVisibilityHint::Covered
    1624              :             } else {
    1625         1554 :                 LayerVisibilityHint::Visible
    1626              :             };
    1627              : 
    1628         1564 :             if expect_visible != *visible {
    1629            0 :                 eprintln!(
    1630            0 :                     "Layer {}..{} @ {}..{} (delta={}) is {visible:?}, should be {expect_visible:?}",
    1631            0 :                     layer_desc.key_range.start,
    1632            0 :                     layer_desc.key_range.end,
    1633            0 :                     layer_desc.lsn_range.start,
    1634            0 :                     layer_desc.lsn_range.end,
    1635            0 :                     layer_desc.is_delta()
    1636              :                 );
    1637            0 :                 if expect_visible == LayerVisibilityHint::Covered {
    1638            0 :                     eprintln!("Covered by:");
    1639            0 :                     for other in covered_by {
    1640            0 :                         eprintln!(
    1641            0 :                             "  {}..{} @ {}",
    1642            0 :                             other.get_key_range().start,
    1643            0 :                             other.get_key_range().end,
    1644            0 :                             other.image_layer_lsn()
    1645            0 :                         );
    1646            0 :                     }
    1647            0 :                     if let Some(range) = coverage.ranges.first() {
    1648            0 :                         eprintln!(
    1649            0 :                             "Total coverage from contributing layers: {}..{}",
    1650            0 :                             range.start, range.end
    1651            0 :                         );
    1652            0 :                     } else {
    1653            0 :                         eprintln!(
    1654            0 :                             "Total coverage from contributing layers: {:?}",
    1655            0 :                             coverage.ranges
    1656            0 :                         );
    1657            0 :                     }
    1658            0 :                 }
    1659         1564 :             }
    1660         1564 :             assert_eq!(expect_visible, *visible);
    1661              :         }
    1662              : 
    1663              :         // Sanity: the layer that holds latest data for the DBDIR key should always be visible
    1664              :         // (just using this key as a key that will always exist for any layermap fixture)
    1665            1 :         let dbdir_layer = {
    1666            1 :             let readable_layer = layer_map
    1667            1 :                 .search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
    1668            1 :                 .unwrap();
    1669              : 
    1670            1 :             match readable_layer.layer {
    1671            1 :                 ReadableLayerWeak::PersistentLayer(desc) => desc,
    1672            0 :                 ReadableLayerWeak::InMemoryLayer(_) => unreachable!(""),
    1673              :             }
    1674              :         };
    1675            1 :         assert!(matches!(
    1676            1 :             layer_visibilities.get(&dbdir_layer).unwrap(),
    1677              :             LayerVisibilityHint::Visible
    1678              :         ));
    1679            1 :     }
    1680              : 
    1681              :     /* BEGIN_HADRON */
    1682              :     #[test]
    1683            1 :     fn test_compute_image_consistent_lsn() {
    1684            1 :         let mut layer_map = LayerMap::default();
    1685              : 
    1686            1 :         let disk_consistent_lsn = Lsn(1000);
    1687              :         // case 1: empty layer map
    1688            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1689            1 :         assert_eq!(
    1690            1 :             disk_consistent_lsn.checked_sub(Lsn(1)).unwrap(),
    1691              :             image_consistent_lsn
    1692              :         );
    1693              : 
    1694              :         // case 2: only L0 delta layer
    1695            1 :         {
    1696            1 :             let mut updates = layer_map.batch_update();
    1697            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1698            1 :                 Key::from_i128(0)..Key::from_i128(100),
    1699            1 :                 Lsn(900)..Lsn(990),
    1700            1 :                 true,
    1701            1 :             ));
    1702            1 : 
    1703            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1704            1 :                 Key::from_i128(0)..Key::from_i128(100),
    1705            1 :                 Lsn(850)..Lsn(899),
    1706            1 :                 true,
    1707            1 :             ));
    1708            1 :         }
    1709              : 
    1710              :         // should use min L0 delta LSN - 1 as image consistent LSN
    1711            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1712            1 :         assert_eq!(Lsn(849), image_consistent_lsn);
    1713              : 
    1714              :         // case 3: 3 images, no L1 delta
    1715            1 :         {
    1716            1 :             let mut updates = layer_map.batch_update();
    1717            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1718            1 :                 Key::from_i128(0)..Key::from_i128(40),
    1719            1 :                 Lsn(100)..Lsn(100),
    1720            1 :                 false,
    1721            1 :             ));
    1722            1 : 
    1723            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1724            1 :                 Key::from_i128(40)..Key::from_i128(70),
    1725            1 :                 Lsn(200)..Lsn(200),
    1726            1 :                 false,
    1727            1 :             ));
    1728            1 : 
    1729            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1730            1 :                 Key::from_i128(70)..Key::from_i128(100),
    1731            1 :                 Lsn(150)..Lsn(150),
    1732            1 :                 false,
    1733            1 :             ));
    1734            1 :         }
    1735              :         // should use min L0 delta LSN - 1 as image consistent LSN
    1736            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1737            1 :         assert_eq!(Lsn(849), image_consistent_lsn);
    1738              : 
    1739              :         // case 4: 3 images with 1 L1 delta
    1740            1 :         {
    1741            1 :             let mut updates = layer_map.batch_update();
    1742            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1743            1 :                 Key::from_i128(0)..Key::from_i128(50),
    1744            1 :                 Lsn(300)..Lsn(350),
    1745            1 :                 true,
    1746            1 :             ));
    1747            1 :         }
    1748            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1749            1 :         assert_eq!(Lsn(299), image_consistent_lsn);
    1750              : 
    1751              :         // case 5: 3 images with 1 more L1 delta with smaller LSN
    1752            1 :         {
    1753            1 :             let mut updates = layer_map.batch_update();
    1754            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1755            1 :                 Key::from_i128(50)..Key::from_i128(72),
    1756            1 :                 Lsn(200)..Lsn(300),
    1757            1 :                 true,
    1758            1 :             ));
    1759            1 :         }
    1760            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1761            1 :         assert_eq!(Lsn(199), image_consistent_lsn);
    1762              : 
    1763              :         // case 6: 3 images with more newer L1 deltas (no impact on final results)
    1764            1 :         {
    1765            1 :             let mut updates = layer_map.batch_update();
    1766            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1767            1 :                 Key::from_i128(0)..Key::from_i128(30),
    1768            1 :                 Lsn(400)..Lsn(500),
    1769            1 :                 true,
    1770            1 :             ));
    1771            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1772            1 :                 Key::from_i128(35)..Key::from_i128(100),
    1773            1 :                 Lsn(450)..Lsn(600),
    1774            1 :                 true,
    1775            1 :             ));
    1776            1 :         }
    1777            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1778            1 :         assert_eq!(Lsn(199), image_consistent_lsn);
    1779              : 
    1780              :         // case 7: 3 images with more older L1 deltas (no impact on final results)
    1781            1 :         {
    1782            1 :             let mut updates = layer_map.batch_update();
    1783            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1784            1 :                 Key::from_i128(0)..Key::from_i128(40),
    1785            1 :                 Lsn(0)..Lsn(50),
    1786            1 :                 true,
    1787            1 :             ));
    1788            1 : 
    1789            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1790            1 :                 Key::from_i128(50)..Key::from_i128(100),
    1791            1 :                 Lsn(10)..Lsn(60),
    1792            1 :                 true,
    1793            1 :             ));
    1794            1 :         }
    1795            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1796            1 :         assert_eq!(Lsn(199), image_consistent_lsn);
    1797              : 
    1798              :         // case 8: 3 images with one more L1 delta with overlapping LSN range
    1799            1 :         {
    1800            1 :             let mut updates = layer_map.batch_update();
    1801            1 :             updates.insert_historic(PersistentLayerDesc::new_test(
    1802            1 :                 Key::from_i128(0)..Key::from_i128(50),
    1803            1 :                 Lsn(50)..Lsn(250),
    1804            1 :                 true,
    1805            1 :             ));
    1806            1 :         }
    1807            1 :         let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
    1808            1 :         assert_eq!(Lsn(100), image_consistent_lsn);
    1809            1 :     }
    1810              : 
    1811              :     /* END_HADRON */
    1812              : }
    1813              : 
    1814              : #[cfg(test)]
    1815              : mod select_layer_tests {
    1816              :     use super::*;
    1817              : 
    1818           17 :     fn create_persistent_layer(
    1819           17 :         start_lsn: u64,
    1820           17 :         end_lsn: u64,
    1821           17 :         is_delta: bool,
    1822           17 :     ) -> Arc<PersistentLayerDesc> {
    1823           17 :         if !is_delta {
    1824            8 :             assert_eq!(end_lsn, start_lsn + 1);
    1825            9 :         }
    1826              : 
    1827           17 :         Arc::new(PersistentLayerDesc::new_test(
    1828           17 :             Key::MIN..Key::MAX,
    1829           17 :             Lsn(start_lsn)..Lsn(end_lsn),
    1830           17 :             is_delta,
    1831              :         ))
    1832           17 :     }
    1833              : 
    1834            6 :     fn create_inmem_layer(start_lsn: u64, end_lsn: u64) -> InMemoryLayerDesc {
    1835            6 :         InMemoryLayerDesc {
    1836            6 :             handle: InMemoryLayerHandle::Open,
    1837            6 :             lsn_range: Lsn(start_lsn)..Lsn(end_lsn),
    1838            6 :         }
    1839            6 :     }
    1840              : 
    1841              :     #[test]
    1842            1 :     fn test_select_layer_empty() {
    1843            1 :         assert!(LayerMap::select_layer(None, None, None, Lsn(100)).is_none());
    1844            1 :     }
    1845              : 
    1846              :     #[test]
    1847            1 :     fn test_select_layer_only_delta() {
    1848            1 :         let delta = create_persistent_layer(10, 20, true);
    1849            1 :         let result = LayerMap::select_layer(Some(delta.clone()), None, None, Lsn(100)).unwrap();
    1850              : 
    1851            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1852            1 :         assert!(
    1853            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1854              :         );
    1855            1 :     }
    1856              : 
    1857              :     #[test]
    1858            1 :     fn test_select_layer_only_image() {
    1859            1 :         let image = create_persistent_layer(10, 11, false);
    1860            1 :         let result = LayerMap::select_layer(None, Some(image.clone()), None, Lsn(100)).unwrap();
    1861              : 
    1862            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1863            1 :         assert!(
    1864            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1865              :         );
    1866            1 :     }
    1867              : 
    1868              :     #[test]
    1869            1 :     fn test_select_layer_only_inmem() {
    1870            1 :         let inmem = create_inmem_layer(10, 20);
    1871            1 :         let result = LayerMap::select_layer(None, None, Some(inmem.clone()), Lsn(100)).unwrap();
    1872              : 
    1873            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1874            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1875            1 :     }
    1876              : 
    1877              :     #[test]
    1878            1 :     fn test_select_layer_image_inside_delta() {
    1879            1 :         let delta = create_persistent_layer(10, 20, true);
    1880            1 :         let image = create_persistent_layer(15, 16, false);
    1881              : 
    1882            1 :         let result =
    1883            1 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(100))
    1884            1 :                 .unwrap();
    1885              : 
    1886            1 :         assert_eq!(result.lsn_floor, Lsn(16));
    1887            1 :         assert!(
    1888            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1889              :         );
    1890              : 
    1891            1 :         let result = LayerMap::select_layer(
    1892            1 :             Some(delta.clone()),
    1893            1 :             Some(image.clone()),
    1894            1 :             None,
    1895            1 :             result.lsn_floor,
    1896              :         )
    1897            1 :         .unwrap();
    1898              : 
    1899            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    1900            1 :         assert!(
    1901            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1902              :         );
    1903            1 :     }
    1904              : 
    1905              :     #[test]
    1906            1 :     fn test_select_layer_newer_image() {
    1907            1 :         let delta = create_persistent_layer(10, 20, true);
    1908            1 :         let image = create_persistent_layer(25, 26, false);
    1909              : 
    1910            1 :         let result =
    1911            1 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1912            1 :                 .unwrap();
    1913              : 
    1914            1 :         assert_eq!(result.lsn_floor, Lsn(25));
    1915            1 :         assert!(
    1916            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1917              :         );
    1918              : 
    1919            1 :         let result =
    1920            1 :             LayerMap::select_layer(Some(delta.clone()), None, None, result.lsn_floor).unwrap();
    1921              : 
    1922            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1923            1 :         assert!(
    1924            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1925              :         );
    1926            1 :     }
    1927              : 
    1928              :     #[test]
    1929            1 :     fn test_select_layer_delta_with_older_image() {
    1930            1 :         let delta = create_persistent_layer(15, 25, true);
    1931            1 :         let image = create_persistent_layer(10, 11, false);
    1932              : 
    1933            1 :         let result =
    1934            1 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1935            1 :                 .unwrap();
    1936              : 
    1937            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    1938            1 :         assert!(
    1939            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1940              :         );
    1941              : 
    1942            1 :         let result =
    1943            1 :             LayerMap::select_layer(None, Some(image.clone()), None, result.lsn_floor).unwrap();
    1944              : 
    1945            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1946            1 :         assert!(
    1947            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1948              :         );
    1949            1 :     }
    1950              : 
    1951              :     #[test]
    1952            1 :     fn test_select_layer_image_inside_inmem() {
    1953            1 :         let image = create_persistent_layer(15, 16, false);
    1954            1 :         let inmem = create_inmem_layer(10, 25);
    1955              : 
    1956            1 :         let result =
    1957            1 :             LayerMap::select_layer(None, Some(image.clone()), Some(inmem.clone()), Lsn(30))
    1958            1 :                 .unwrap();
    1959              : 
    1960            1 :         assert_eq!(result.lsn_floor, Lsn(16));
    1961            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1962              : 
    1963            1 :         let result = LayerMap::select_layer(
    1964            1 :             None,
    1965            1 :             Some(image.clone()),
    1966            1 :             Some(inmem.clone()),
    1967            1 :             result.lsn_floor,
    1968              :         )
    1969            1 :         .unwrap();
    1970              : 
    1971            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    1972            1 :         assert!(
    1973            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1974              :         );
    1975              : 
    1976            1 :         let result =
    1977            1 :             LayerMap::select_layer(None, None, Some(inmem.clone()), result.lsn_floor).unwrap();
    1978            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    1979            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1980            1 :     }
    1981              : 
    1982              :     #[test]
    1983            1 :     fn test_select_layer_delta_inside_inmem() {
    1984            1 :         let delta_top = create_persistent_layer(15, 20, true);
    1985            1 :         let delta_bottom = create_persistent_layer(10, 15, true);
    1986            1 :         let inmem = create_inmem_layer(15, 25);
    1987              : 
    1988            1 :         let result =
    1989            1 :             LayerMap::select_layer(Some(delta_top.clone()), None, Some(inmem.clone()), Lsn(30))
    1990            1 :                 .unwrap();
    1991              : 
    1992            1 :         assert_eq!(result.lsn_floor, Lsn(20));
    1993            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1994              : 
    1995            1 :         let result = LayerMap::select_layer(
    1996            1 :             Some(delta_top.clone()),
    1997            1 :             None,
    1998            1 :             Some(inmem.clone()),
    1999            1 :             result.lsn_floor,
    2000              :         )
    2001            1 :         .unwrap();
    2002              : 
    2003            1 :         assert_eq!(result.lsn_floor, Lsn(15));
    2004            1 :         assert!(
    2005            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_top))
    2006              :         );
    2007              : 
    2008            1 :         let result = LayerMap::select_layer(
    2009            1 :             Some(delta_bottom.clone()),
    2010            1 :             None,
    2011            1 :             Some(inmem.clone()),
    2012            1 :             result.lsn_floor,
    2013              :         )
    2014            1 :         .unwrap();
    2015            1 :         assert_eq!(result.lsn_floor, Lsn(10));
    2016            1 :         assert!(
    2017            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_bottom))
    2018              :         );
    2019            1 :     }
    2020              : 
    2021              :     #[test]
    2022            1 :     fn test_select_layer_all_overlap_1() {
    2023            1 :         let inmem = create_inmem_layer(10, 30);
    2024            1 :         let delta = create_persistent_layer(15, 25, true);
    2025            1 :         let image = create_persistent_layer(20, 21, false);
    2026              : 
    2027            1 :         let result = LayerMap::select_layer(
    2028            1 :             Some(delta.clone()),
    2029            1 :             Some(image.clone()),
    2030            1 :             Some(inmem.clone()),
    2031            1 :             Lsn(50),
    2032              :         )
    2033            1 :         .unwrap();
    2034              : 
    2035            1 :         assert_eq!(result.lsn_floor, Lsn(25));
    2036            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    2037              : 
    2038            1 :         let result = LayerMap::select_layer(
    2039            1 :             Some(delta.clone()),
    2040            1 :             Some(image.clone()),
    2041            1 :             Some(inmem.clone()),
    2042            1 :             result.lsn_floor,
    2043              :         )
    2044            1 :         .unwrap();
    2045              : 
    2046            1 :         assert_eq!(result.lsn_floor, Lsn(21));
    2047            1 :         assert!(
    2048            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    2049              :         );
    2050              : 
    2051            1 :         let result = LayerMap::select_layer(
    2052            1 :             Some(delta.clone()),
    2053            1 :             Some(image.clone()),
    2054            1 :             Some(inmem.clone()),
    2055            1 :             result.lsn_floor,
    2056              :         )
    2057            1 :         .unwrap();
    2058              : 
    2059            1 :         assert_eq!(result.lsn_floor, Lsn(20));
    2060            1 :         assert!(
    2061            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    2062              :         );
    2063            1 :     }
    2064              : 
    2065              :     #[test]
    2066            1 :     fn test_select_layer_all_overlap_2() {
    2067            1 :         let inmem = create_inmem_layer(20, 30);
    2068            1 :         let delta = create_persistent_layer(10, 40, true);
    2069            1 :         let image = create_persistent_layer(25, 26, false);
    2070              : 
    2071            1 :         let result = LayerMap::select_layer(
    2072            1 :             Some(delta.clone()),
    2073            1 :             Some(image.clone()),
    2074            1 :             Some(inmem.clone()),
    2075            1 :             Lsn(50),
    2076              :         )
    2077            1 :         .unwrap();
    2078              : 
    2079            1 :         assert_eq!(result.lsn_floor, Lsn(26));
    2080            1 :         assert!(
    2081            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    2082              :         );
    2083              : 
    2084            1 :         let result = LayerMap::select_layer(
    2085            1 :             Some(delta.clone()),
    2086            1 :             Some(image.clone()),
    2087            1 :             Some(inmem.clone()),
    2088            1 :             result.lsn_floor,
    2089              :         )
    2090            1 :         .unwrap();
    2091              : 
    2092            1 :         assert_eq!(result.lsn_floor, Lsn(25));
    2093            1 :         assert!(
    2094            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    2095              :         );
    2096            1 :     }
    2097              : 
    2098              :     #[test]
    2099            1 :     fn test_select_layer_all_overlap_3() {
    2100            1 :         let inmem = create_inmem_layer(30, 40);
    2101            1 :         let delta = create_persistent_layer(10, 30, true);
    2102            1 :         let image = create_persistent_layer(20, 21, false);
    2103              : 
    2104            1 :         let result = LayerMap::select_layer(
    2105            1 :             Some(delta.clone()),
    2106            1 :             Some(image.clone()),
    2107            1 :             Some(inmem.clone()),
    2108            1 :             Lsn(50),
    2109              :         )
    2110            1 :         .unwrap();
    2111              : 
    2112            1 :         assert_eq!(result.lsn_floor, Lsn(30));
    2113            1 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    2114              : 
    2115            1 :         let result = LayerMap::select_layer(
    2116            1 :             Some(delta.clone()),
    2117            1 :             Some(image.clone()),
    2118            1 :             None,
    2119            1 :             result.lsn_floor,
    2120              :         )
    2121            1 :         .unwrap();
    2122              : 
    2123            1 :         assert_eq!(result.lsn_floor, Lsn(21));
    2124            1 :         assert!(
    2125            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    2126              :         );
    2127              : 
    2128            1 :         let result = LayerMap::select_layer(
    2129            1 :             Some(delta.clone()),
    2130            1 :             Some(image.clone()),
    2131            1 :             None,
    2132            1 :             result.lsn_floor,
    2133              :         )
    2134            1 :         .unwrap();
    2135              : 
    2136            1 :         assert_eq!(result.lsn_floor, Lsn(20));
    2137            1 :         assert!(
    2138            1 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    2139              :         );
    2140            1 :     }
    2141              : }
        

Generated by: LCOV version 2.1-beta