LCOV - code coverage report
Current view: top level - pageserver/src/tenant - layer_map.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 92.3 % 1271 1173
Test Date: 2025-03-12 18:28:53 Functions: 94.3 % 88 83

            Line data    Source code
       1              : //!
       2              : //! The layer map tracks what layers exist in a timeline.
       3              : //!
       4              : //! When the timeline is first accessed, the server lists of all layer files
       5              : //! in the timelines/<timeline_id> directory, and populates this map with
       6              : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
       7              : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
       8              : //! records. Now and then, in the checkpoint() function, the in-memory layer is
       9              : //! are frozen, and it is split up into new image and delta layers and the
      10              : //! corresponding files are written to disk.
      11              : //!
      12              : //! Design overview:
      13              : //!
      14              : //! The `search` method of the layer map is on the read critical path, so we've
      15              : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
      16              : //! Other read methods are less critical but still impact performance of background tasks.
      17              : //!
      18              : //! This data structure relies on a persistent/immutable binary search tree. See the
      19              : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
      20              : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
      21              : //! you to modify the tree in such a way that each modification creates a new "version"
      22              : //! of the tree. When you modify it, you get a new version, but all previous versions are
      23              : //! still accessible too. So if someone is still holding a reference to an older version,
      24              : //! they continue to see the tree as it was then. The persistent BST stores all the
      25              : //! different versions in an efficient way.
      26              : //!
      27              : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
      28              : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
      29              : //! to handle the LSN dimension.
      30              : //!
      31              : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
      32              : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
      33              : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
      34              : //! `historic_layer_coverage.rs`.
      35              : //!
      36              : //! To search for a particular key-LSN pair, you first look up the right "version" in the
      37              : //! BTreeMap. Then you search that version of the BST with the key.
      38              : //!
      39              : //! The persistent BST keeps all the versions, but there is no way to change the old versions
      40              : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
      41              : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
      42              : //! to throw away most of the persistent BST and build a new one, starting from the oldest
      43              : //! LSN. See [`LayerMap::flush_updates()`].
      44              : //!
      45              : 
      46              : mod historic_layer_coverage;
      47              : mod layer_coverage;
      48              : 
      49              : use std::collections::{HashMap, VecDeque};
      50              : use std::iter::Peekable;
      51              : use std::ops::Range;
      52              : use std::sync::Arc;
      53              : 
      54              : use anyhow::Result;
      55              : use historic_layer_coverage::BufferedHistoricLayerCoverage;
      56              : pub use historic_layer_coverage::LayerKey;
      57              : use pageserver_api::key::Key;
      58              : use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
      59              : use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
      60              : use tokio::sync::watch;
      61              : use utils::lsn::Lsn;
      62              : 
      63              : use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
      64              : use crate::context::RequestContext;
      65              : use crate::tenant::storage_layer::{InMemoryLayer, ReadableLayerWeak};
      66              : 
      67              : ///
      68              : /// LayerMap tracks what layers exist on a timeline.
      69              : ///
      70              : pub struct LayerMap {
      71              :     //
      72              :     // 'open_layer' holds the current InMemoryLayer that is accepting new
      73              :     // records. If it is None, 'next_open_layer_at' will be set instead, indicating
      74              :     // where the start LSN of the next InMemoryLayer that is to be created.
      75              :     //
      76              :     pub open_layer: Option<Arc<InMemoryLayer>>,
      77              :     pub next_open_layer_at: Option<Lsn>,
      78              : 
      79              :     ///
      80              :     /// Frozen layers, if any. Frozen layers are in-memory layers that
      81              :     /// are no longer added to, but haven't been written out to disk
      82              :     /// yet. They contain WAL older than the current 'open_layer' or
      83              :     /// 'next_open_layer_at', but newer than any historic layer.
      84              :     /// The frozen layers are in order from oldest to newest, so that
      85              :     /// the newest one is in the 'back' of the VecDeque, and the oldest
      86              :     /// in the 'front'.
      87              :     ///
      88              :     pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
      89              : 
      90              :     /// Index of the historic layers optimized for search
      91              :     historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
      92              : 
      93              :     /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
      94              :     /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
      95              :     ///
      96              :     /// NB: make sure to notify `watch_l0_deltas` on changes.
      97              :     l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
      98              : 
      99              :     /// Notifies about L0 delta layer changes, sending the current number of L0 layers.
     100              :     watch_l0_deltas: watch::Sender<usize>,
     101              : }
     102              : 
     103              : impl Default for LayerMap {
     104          920 :     fn default() -> Self {
     105          920 :         Self {
     106          920 :             open_layer: Default::default(),
     107          920 :             next_open_layer_at: Default::default(),
     108          920 :             frozen_layers: Default::default(),
     109          920 :             historic: Default::default(),
     110          920 :             l0_delta_layers: Default::default(),
     111          920 :             watch_l0_deltas: watch::channel(0).0,
     112          920 :         }
     113          920 :     }
     114              : }
     115              : 
     116              : /// The primary update API for the layer map.
     117              : ///
     118              : /// Batching historic layer insertions and removals is good for
     119              : /// performance and this struct helps us do that correctly.
     120              : #[must_use]
     121              : pub struct BatchedUpdates<'a> {
     122              :     // While we hold this exclusive reference to the layer map the type checker
     123              :     // will prevent us from accidentally reading any unflushed updates.
     124              :     layer_map: &'a mut LayerMap,
     125              : }
     126              : 
     127              : /// Provide ability to batch more updates while hiding the read
     128              : /// API so we don't accidentally read without flushing.
     129              : impl BatchedUpdates<'_> {
     130              :     ///
     131              :     /// Insert an on-disk layer.
     132              :     ///
     133              :     // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
     134         9808 :     pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
     135         9808 :         self.layer_map.insert_historic_noflush(layer_desc)
     136         9808 :     }
     137              : 
     138              :     ///
     139              :     /// Remove an on-disk layer from the map.
     140              :     ///
     141              :     /// This should be called when the corresponding file on disk has been deleted.
     142              :     ///
     143         1032 :     pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
     144         1032 :         self.layer_map.remove_historic_noflush(layer_desc)
     145         1032 :     }
     146              : 
     147              :     // We will flush on drop anyway, but this method makes it
     148              :     // more explicit that there is some work being done.
     149              :     /// Apply all updates
     150         3584 :     pub fn flush(self) {
     151         3584 :         // Flush happens on drop
     152         3584 :     }
     153              : }
     154              : 
     155              : // Ideally the flush() method should be called explicitly for more
     156              : // controlled execution. But if we forget we'd rather flush on drop
     157              : // than panic later or read without flushing.
     158              : //
     159              : // TODO maybe warn if flush hasn't explicitly been called
     160              : impl Drop for BatchedUpdates<'_> {
     161         3584 :     fn drop(&mut self) {
     162         3584 :         self.layer_map.flush_updates();
     163         3584 :     }
     164              : }
     165              : 
     166              : /// Return value of LayerMap::search
     167              : #[derive(Eq, PartialEq, Debug, Hash)]
     168              : pub struct SearchResult {
     169              :     pub layer: ReadableLayerWeak,
     170              :     pub lsn_floor: Lsn,
     171              : }
     172              : 
     173              : /// Return value of [`LayerMap::range_search`]
     174              : ///
     175              : /// Contains a mapping from a layer description to a keyspace
     176              : /// accumulator that contains all the keys which intersect the layer
     177              : /// from the original search space.
     178              : #[derive(Debug)]
     179              : pub struct RangeSearchResult {
     180              :     pub found: HashMap<SearchResult, KeySpaceAccum>,
     181              : }
     182              : 
     183              : impl RangeSearchResult {
     184      1960579 :     fn new() -> Self {
     185      1960579 :         Self {
     186      1960579 :             found: HashMap::new(),
     187      1960579 :         }
     188      1960579 :     }
     189              : 
     190       671368 :     fn map_to_in_memory_layer(
     191       671368 :         in_memory_layer: Option<InMemoryLayerDesc>,
     192       671368 :         range: Range<Key>,
     193       671368 :     ) -> RangeSearchResult {
     194       671368 :         match in_memory_layer {
     195       218571 :             Some(inmem) => {
     196       218571 :                 let search_result = SearchResult {
     197       218571 :                     lsn_floor: inmem.get_lsn_range().start,
     198       218571 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     199       218571 :                 };
     200       218571 : 
     201       218571 :                 let mut accum = KeySpaceAccum::new();
     202       218571 :                 accum.add_range(range);
     203       218571 :                 RangeSearchResult {
     204       218571 :                     found: HashMap::from([(search_result, accum)]),
     205       218571 :                 }
     206              :             }
     207       452797 :             None => RangeSearchResult::new(),
     208              :         }
     209       671368 :     }
     210              : }
     211              : 
     212              : /// Collector for results of range search queries on the LayerMap.
     213              : /// It should be provided with two iterators for the delta and image coverage
     214              : /// that contain all the changes for layers which intersect the range.
     215              : struct RangeSearchCollector<Iter>
     216              : where
     217              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     218              : {
     219              :     in_memory_layer: Option<InMemoryLayerDesc>,
     220              :     delta_coverage: Peekable<Iter>,
     221              :     image_coverage: Peekable<Iter>,
     222              :     key_range: Range<Key>,
     223              :     end_lsn: Lsn,
     224              : 
     225              :     current_delta: Option<Arc<PersistentLayerDesc>>,
     226              :     current_image: Option<Arc<PersistentLayerDesc>>,
     227              : 
     228              :     result: RangeSearchResult,
     229              : }
     230              : 
     231              : #[derive(Debug)]
     232              : enum NextLayerType {
     233              :     Delta(i128),
     234              :     Image(i128),
     235              :     Both(i128),
     236              : }
     237              : 
     238              : impl NextLayerType {
     239      2997992 :     fn next_change_at_key(&self) -> Key {
     240      2997992 :         match self {
     241       305190 :             NextLayerType::Delta(at) => Key::from_i128(*at),
     242      1206640 :             NextLayerType::Image(at) => Key::from_i128(*at),
     243      1486162 :             NextLayerType::Both(at) => Key::from_i128(*at),
     244              :         }
     245      2997992 :     }
     246              : }
     247              : 
     248              : impl<Iter> RangeSearchCollector<Iter>
     249              : where
     250              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     251              : {
     252      1493618 :     fn new(
     253      1493618 :         key_range: Range<Key>,
     254      1493618 :         end_lsn: Lsn,
     255      1493618 :         in_memory_layer: Option<InMemoryLayerDesc>,
     256      1493618 :         delta_coverage: Iter,
     257      1493618 :         image_coverage: Iter,
     258      1493618 :     ) -> Self {
     259      1493618 :         Self {
     260      1493618 :             in_memory_layer,
     261      1493618 :             delta_coverage: delta_coverage.peekable(),
     262      1493618 :             image_coverage: image_coverage.peekable(),
     263      1493618 :             key_range,
     264      1493618 :             end_lsn,
     265      1493618 :             current_delta: None,
     266      1493618 :             current_image: None,
     267      1493618 :             result: RangeSearchResult::new(),
     268      1493618 :         }
     269      1493618 :     }
     270              : 
     271              :     /// Run the collector. Collection is implemented via a two pointer algorithm.
     272              :     /// One pointer tracks the start of the current range and the other tracks
     273              :     /// the beginning of the next range which will overlap with the next change
     274              :     /// in coverage across both image and delta.
     275      1493618 :     fn collect(mut self) -> RangeSearchResult {
     276      1493618 :         let next_layer_type = self.choose_next_layer_type();
     277      1484718 :         let mut current_range_start = match next_layer_type {
     278              :             None => {
     279              :                 // No changes for the range
     280         8900 :                 self.pad_range(self.key_range.clone());
     281         8900 :                 return self.result;
     282              :             }
     283      1484718 :             Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
     284            0 :                 // Changes only after the end of the range
     285            0 :                 self.pad_range(self.key_range.clone());
     286            0 :                 return self.result;
     287              :             }
     288      1484718 :             Some(layer_type) => {
     289      1484718 :                 // Changes for the range exist.
     290      1484718 :                 let coverage_start = layer_type.next_change_at_key();
     291      1484718 :                 let range_before = self.key_range.start..coverage_start;
     292      1484718 :                 self.pad_range(range_before);
     293      1484718 : 
     294      1484718 :                 self.advance(&layer_type);
     295      1484718 :                 coverage_start
     296              :             }
     297              :         };
     298              : 
     299      2997992 :         while current_range_start < self.key_range.end {
     300      1513274 :             let next_layer_type = self.choose_next_layer_type();
     301      1513274 :             match next_layer_type {
     302        28556 :                 Some(t) => {
     303        28556 :                     let current_range_end = t.next_change_at_key();
     304        28556 :                     self.add_range(current_range_start..current_range_end);
     305        28556 :                     current_range_start = current_range_end;
     306        28556 : 
     307        28556 :                     self.advance(&t);
     308        28556 :                 }
     309      1484718 :                 None => {
     310      1484718 :                     self.add_range(current_range_start..self.key_range.end);
     311      1484718 :                     current_range_start = self.key_range.end;
     312      1484718 :                 }
     313              :             }
     314              :         }
     315              : 
     316      1484718 :         self.result
     317      1493618 :     }
     318              : 
     319              :     /// Map a range which does not intersect any persistent layers to
     320              :     /// the in-memory layer candidate.
     321      1495494 :     fn pad_range(&mut self, key_range: Range<Key>) {
     322      1495494 :         if !key_range.is_empty() {
     323        14720 :             if let Some(ref inmem) = self.in_memory_layer {
     324         6348 :                 let search_result = SearchResult {
     325         6348 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem.clone()),
     326         6348 :                     lsn_floor: inmem.get_lsn_range().start,
     327         6348 :                 };
     328         6348 : 
     329         6348 :                 self.result
     330         6348 :                     .found
     331         6348 :                     .entry(search_result)
     332         6348 :                     .or_default()
     333         6348 :                     .add_range(key_range);
     334         8372 :             }
     335      1480774 :         }
     336      1495494 :     }
     337              : 
     338              :     /// Select the appropiate layer for the given range and update
     339              :     /// the collector.
     340      1513274 :     fn add_range(&mut self, covered_range: Range<Key>) {
     341      1513274 :         let selected = LayerMap::select_layer(
     342      1513274 :             self.current_delta.clone(),
     343      1513274 :             self.current_image.clone(),
     344      1513274 :             self.in_memory_layer.clone(),
     345      1513274 :             self.end_lsn,
     346      1513274 :         );
     347      1513274 : 
     348      1513274 :         match selected {
     349      1511398 :             Some(search_result) => self
     350      1511398 :                 .result
     351      1511398 :                 .found
     352      1511398 :                 .entry(search_result)
     353      1511398 :                 .or_default()
     354      1511398 :                 .add_range(covered_range),
     355         1876 :             None => self.pad_range(covered_range),
     356              :         }
     357      1513274 :     }
     358              : 
     359              :     /// Move to the next coverage change.
     360      1513274 :     fn advance(&mut self, layer_type: &NextLayerType) {
     361      1513274 :         match layer_type {
     362       156015 :             NextLayerType::Delta(_) => {
     363       156015 :                 let (_, layer) = self.delta_coverage.next().unwrap();
     364       156015 :                 self.current_delta = layer;
     365       156015 :             }
     366       605134 :             NextLayerType::Image(_) => {
     367       605134 :                 let (_, layer) = self.image_coverage.next().unwrap();
     368       605134 :                 self.current_image = layer;
     369       605134 :             }
     370       752125 :             NextLayerType::Both(_) => {
     371       752125 :                 let (_, image_layer) = self.image_coverage.next().unwrap();
     372       752125 :                 let (_, delta_layer) = self.delta_coverage.next().unwrap();
     373       752125 : 
     374       752125 :                 self.current_image = image_layer;
     375       752125 :                 self.current_delta = delta_layer;
     376       752125 :             }
     377              :         }
     378      1513274 :     }
     379              : 
     380              :     /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
     381      3006892 :     fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
     382      3006892 :         let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
     383      3006892 :         let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
     384      3006892 : 
     385      3006892 :         match (next_delta_at, next_image_at) {
     386      1493618 :             (None, None) => None,
     387       145931 :             (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
     388       603390 :             (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
     389       763953 :             (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
     390         1744 :                 Some(NextLayerType::Image(*next_image_at))
     391              :             }
     392       762209 :             (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
     393        10084 :                 Some(NextLayerType::Delta(*next_delta_at))
     394              :             }
     395       752125 :             (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
     396              :         }
     397      3006892 :     }
     398              : }
     399              : 
     400              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     401              : pub struct InMemoryLayerDesc {
     402              :     handle: InMemoryLayerHandle,
     403              :     lsn_range: Range<Lsn>,
     404              : }
     405              : 
     406              : impl InMemoryLayerDesc {
     407      2515187 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     408      2515187 :         self.lsn_range.clone()
     409      2515187 :     }
     410              : }
     411              : 
     412              : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
     413              : enum InMemoryLayerHandle {
     414              :     Open,
     415              :     Frozen(usize),
     416              : }
     417              : 
     418              : impl LayerMap {
     419              :     ///
     420              :     /// Find the latest layer (by lsn.end) that covers the given
     421              :     /// 'key', with lsn.start < 'end_lsn'.
     422              :     ///
     423              :     /// The caller of this function is the page reconstruction
     424              :     /// algorithm looking for the next relevant delta layer, or
     425              :     /// the terminal image layer. The caller will pass the lsn_floor
     426              :     /// value as end_lsn in the next call to search.
     427              :     ///
     428              :     /// If there's an image layer exactly below the given end_lsn,
     429              :     /// search should return that layer regardless if there are
     430              :     /// overlapping deltas.
     431              :     ///
     432              :     /// If the latest layer is a delta and there is an overlapping
     433              :     /// image with it below, the lsn_floor returned should be right
     434              :     /// above that image so we don't skip it in the search. Otherwise
     435              :     /// the lsn_floor returned should be the bottom of the delta layer
     436              :     /// because we should make as much progress down the lsn axis
     437              :     /// as possible. It's fine if this way we skip some overlapping
     438              :     /// deltas, because the delta we returned would contain the same
     439              :     /// wal content.
     440              :     ///
     441              :     /// TODO: This API is convoluted and inefficient. If the caller
     442              :     /// makes N search calls, we'll end up finding the same latest
     443              :     /// image layer N times. We should either cache the latest image
     444              :     /// layer result, or simplify the api to `get_latest_image` and
     445              :     /// `get_latest_delta`, and only call `get_latest_image` once.
     446              :     ///
     447       287924 :     pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
     448       287924 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     449              : 
     450       287924 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     451       287924 :             Some(version) => version,
     452              :             None => {
     453            0 :                 return in_memory_layer.map(|desc| SearchResult {
     454            0 :                     lsn_floor: desc.get_lsn_range().start,
     455            0 :                     layer: ReadableLayerWeak::InMemoryLayer(desc),
     456            0 :                 });
     457              :             }
     458              :         };
     459              : 
     460       287924 :         let latest_delta = version.delta_coverage.query(key.to_i128());
     461       287924 :         let latest_image = version.image_coverage.query(key.to_i128());
     462       287924 : 
     463       287924 :         Self::select_layer(latest_delta, latest_image, in_memory_layer, end_lsn)
     464       287924 :     }
     465              : 
     466              :     /// Select a layer from three potential candidates (in-memory, delta and image layer).
     467              :     /// The candidates represent the first layer of each type which intersect a key range.
     468              :     ///
     469              :     /// Layer types have an in implicit priority (image > delta > in-memory). For instance,
     470              :     /// if we have the option of reading an LSN range from both an image and a delta, we
     471              :     /// should read from the image.
     472      2698862 :     fn select_layer(
     473      2698862 :         delta_layer: Option<Arc<PersistentLayerDesc>>,
     474      2698862 :         image_layer: Option<Arc<PersistentLayerDesc>>,
     475      2698862 :         in_memory_layer: Option<InMemoryLayerDesc>,
     476      2698862 :         end_lsn: Lsn,
     477      2698862 :     ) -> Option<SearchResult> {
     478      2698862 :         assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta()));
     479      2698862 :         assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta()));
     480              : 
     481      2698862 :         match (delta_layer, image_layer, in_memory_layer) {
     482        23200 :             (None, None, None) => None,
     483        73656 :             (None, Some(image), None) => {
     484        73656 :                 let lsn_floor = image.get_lsn_range().start;
     485        73656 :                 Some(SearchResult {
     486        73656 :                     layer: ReadableLayerWeak::PersistentLayer(image),
     487        73656 :                     lsn_floor,
     488        73656 :                 })
     489              :             }
     490       153627 :             (Some(delta), None, None) => {
     491       153627 :                 let lsn_floor = delta.get_lsn_range().start;
     492       153627 :                 Some(SearchResult {
     493       153627 :                     layer: ReadableLayerWeak::PersistentLayer(delta),
     494       153627 :                     lsn_floor,
     495       153627 :                 })
     496              :             }
     497       842861 :             (Some(delta), Some(image), None) => {
     498       842861 :                 let img_lsn = image.get_lsn_range().start;
     499       842861 :                 let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
     500       842861 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     501       842861 :                 if image_is_newer || image_exact_match {
     502        82388 :                     Some(SearchResult {
     503        82388 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     504        82388 :                         lsn_floor: img_lsn,
     505        82388 :                     })
     506              :                 } else {
     507              :                     // If the delta overlaps with the image in the LSN dimension, do a partial
     508              :                     // up to the image layer.
     509       760473 :                     let lsn_floor =
     510       760473 :                         std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
     511       760473 :                     Some(SearchResult {
     512       760473 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     513       760473 :                         lsn_floor,
     514       760473 :                     })
     515              :                 }
     516              :             }
     517        23164 :             (None, None, Some(inmem)) => {
     518        23164 :                 let lsn_floor = inmem.get_lsn_range().start;
     519        23164 :                 Some(SearchResult {
     520        23164 :                     layer: ReadableLayerWeak::InMemoryLayer(inmem),
     521        23164 :                     lsn_floor,
     522        23164 :                 })
     523              :             }
     524       685326 :             (None, Some(image), Some(inmem)) => {
     525       685326 :                 // If the in-memory layer overlaps with the image in the LSN dimension, do a partial
     526       685326 :                 // up to the image layer.
     527       685326 :                 let img_lsn = image.get_lsn_range().start;
     528       685326 :                 let image_is_newer = image.get_lsn_range().end >= inmem.get_lsn_range().end;
     529       685326 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     530       685326 :                 if image_is_newer || image_exact_match {
     531           20 :                     Some(SearchResult {
     532           20 :                         layer: ReadableLayerWeak::PersistentLayer(image),
     533           20 :                         lsn_floor: img_lsn,
     534           20 :                     })
     535              :                 } else {
     536       685306 :                     let lsn_floor =
     537       685306 :                         std::cmp::max(inmem.get_lsn_range().start, image.get_lsn_range().start + 1);
     538       685306 :                     Some(SearchResult {
     539       685306 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     540       685306 :                         lsn_floor,
     541       685306 :                     })
     542              :                 }
     543              :             }
     544       448244 :             (Some(delta), None, Some(inmem)) => {
     545       448244 :                 // Overlaps between delta and in-memory layers are not a valid
     546       448244 :                 // state, but we handle them here for completeness.
     547       448244 :                 let delta_end = delta.get_lsn_range().end;
     548       448244 :                 let delta_is_newer = delta_end >= inmem.get_lsn_range().end;
     549       448244 :                 let delta_exact_match = delta_end == end_lsn;
     550       448244 :                 if delta_is_newer || delta_exact_match {
     551           16 :                     Some(SearchResult {
     552           16 :                         lsn_floor: delta.get_lsn_range().start,
     553           16 :                         layer: ReadableLayerWeak::PersistentLayer(delta),
     554           16 :                     })
     555              :                 } else {
     556              :                     // If the in-memory layer overlaps with the delta in the LSN dimension, do a partial
     557              :                     // up to the delta layer.
     558       448228 :                     let lsn_floor =
     559       448228 :                         std::cmp::max(inmem.get_lsn_range().start, delta.get_lsn_range().end);
     560       448228 :                     Some(SearchResult {
     561       448228 :                         layer: ReadableLayerWeak::InMemoryLayer(inmem),
     562       448228 :                         lsn_floor,
     563       448228 :                     })
     564              :                 }
     565              :             }
     566       448784 :             (Some(delta), Some(image), Some(inmem)) => {
     567       448784 :                 // Determine the preferred persistent layer without taking the in-memory layer
     568       448784 :                 // into consideration.
     569       448784 :                 let persistent_res =
     570       448784 :                     Self::select_layer(Some(delta.clone()), Some(image.clone()), None, end_lsn)
     571       448784 :                         .unwrap();
     572       448784 :                 let persistent_l = match persistent_res.layer {
     573       448784 :                     ReadableLayerWeak::PersistentLayer(l) => l,
     574            0 :                     ReadableLayerWeak::InMemoryLayer(_) => unreachable!(),
     575              :                 };
     576              : 
     577              :                 // Now handle the in-memory layer overlaps.
     578       448784 :                 let inmem_res = if persistent_l.is_delta() {
     579       428296 :                     Self::select_layer(Some(persistent_l), None, Some(inmem.clone()), end_lsn)
     580       428296 :                         .unwrap()
     581              :                 } else {
     582        20488 :                     Self::select_layer(None, Some(persistent_l), Some(inmem.clone()), end_lsn)
     583        20488 :                         .unwrap()
     584              :                 };
     585              : 
     586       448784 :                 Some(SearchResult {
     587       448784 :                     layer: inmem_res.layer,
     588       448784 :                     // Use the more restrictive LSN floor
     589       448784 :                     lsn_floor: std::cmp::max(persistent_res.lsn_floor, inmem_res.lsn_floor),
     590       448784 :                 })
     591              :             }
     592              :         }
     593      2698862 :     }
     594              : 
     595      2164986 :     pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
     596      2164986 :         let in_memory_layer = self.search_in_memory_layer(end_lsn);
     597              : 
     598      2164986 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     599      1493618 :             Some(version) => version,
     600              :             None => {
     601       671368 :                 return RangeSearchResult::map_to_in_memory_layer(in_memory_layer, key_range);
     602              :             }
     603              :         };
     604              : 
     605      1493618 :         let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
     606      1493618 :         let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
     607      1493618 :         let image_changes = version.image_coverage.range_overlaps(&raw_range);
     608      1493618 : 
     609      1493618 :         let collector = RangeSearchCollector::new(
     610      1493618 :             key_range,
     611      1493618 :             end_lsn,
     612      1493618 :             in_memory_layer,
     613      1493618 :             delta_changes,
     614      1493618 :             image_changes,
     615      1493618 :         );
     616      1493618 :         collector.collect()
     617      2164986 :     }
     618              : 
     619              :     /// Start a batch of updates, applied on drop
     620         3584 :     pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
     621         3584 :         BatchedUpdates { layer_map: self }
     622         3584 :     }
     623              : 
     624              :     ///
     625              :     /// Insert an on-disk layer
     626              :     ///
     627              :     /// Helper function for BatchedUpdates::insert_historic
     628              :     ///
     629              :     /// TODO(chi): remove L generic so that we do not need to pass layer object.
     630         9828 :     pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
     631         9828 :         // TODO: See #3869, resulting #4088, attempted fix and repro #4094
     632         9828 : 
     633         9828 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     634         1996 :             self.l0_delta_layers.push(layer_desc.clone().into());
     635         1996 :             self.watch_l0_deltas
     636         1996 :                 .send_replace(self.l0_delta_layers.len());
     637         7832 :         }
     638              : 
     639         9828 :         self.historic.insert(
     640         9828 :             historic_layer_coverage::LayerKey::from(&layer_desc),
     641         9828 :             layer_desc.into(),
     642         9828 :         );
     643         9828 :     }
     644              : 
     645              :     ///
     646              :     /// Remove an on-disk layer from the map.
     647              :     ///
     648              :     /// Helper function for BatchedUpdates::remove_historic
     649              :     ///
     650         1032 :     pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
     651         1032 :         self.historic
     652         1032 :             .remove(historic_layer_coverage::LayerKey::from(layer_desc));
     653         1032 :         let layer_key = layer_desc.key();
     654         1032 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     655          808 :             let len_before = self.l0_delta_layers.len();
     656          808 :             let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
     657        11048 :             l0_delta_layers.retain(|other| other.key() != layer_key);
     658          808 :             self.l0_delta_layers = l0_delta_layers;
     659          808 :             self.watch_l0_deltas
     660          808 :                 .send_replace(self.l0_delta_layers.len());
     661          808 :             // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
     662          808 :             // there's a chance that the comparison fails at runtime due to it comparing (pointer,
     663          808 :             // vtable) pairs.
     664          808 :             assert_eq!(
     665          808 :                 self.l0_delta_layers.len(),
     666          808 :                 len_before - 1,
     667            0 :                 "failed to locate removed historic layer from l0_delta_layers"
     668              :             );
     669          224 :         }
     670         1032 :     }
     671              : 
     672              :     /// Helper function for BatchedUpdates::drop.
     673         3588 :     pub(self) fn flush_updates(&mut self) {
     674         3588 :         self.historic.rebuild();
     675         3588 :     }
     676              : 
     677              :     /// Is there a newer image layer for given key- and LSN-range? Or a set
     678              :     /// of image layers within the specified lsn range that cover the entire
     679              :     /// specified key range?
     680              :     ///
     681              :     /// This is used for garbage collection, to determine if an old layer can
     682              :     /// be deleted.
     683           16 :     pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
     684           16 :         if key.is_empty() {
     685              :             // Vacuously true. There's a newer image for all 0 of the kerys in the range.
     686            0 :             return true;
     687           16 :         }
     688              : 
     689           16 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     690           16 :             Some(v) => v,
     691            0 :             None => return false,
     692              :         };
     693              : 
     694           16 :         let start = key.start.to_i128();
     695           16 :         let end = key.end.to_i128();
     696           16 : 
     697           20 :         let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
     698           20 :             Some(layer) => layer.get_lsn_range().start >= lsn.start,
     699            0 :             None => false,
     700           20 :         };
     701              : 
     702              :         // Check the start is covered
     703           16 :         if !layer_covers(version.image_coverage.query(start)) {
     704           12 :             return false;
     705            4 :         }
     706              : 
     707              :         // Check after all changes of coverage
     708            4 :         for (_, change_val) in version.image_coverage.range(start..end) {
     709            4 :             if !layer_covers(change_val) {
     710            0 :                 return false;
     711            4 :             }
     712              :         }
     713              : 
     714            4 :         true
     715           16 :     }
     716              : 
     717         7336 :     pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
     718         7336 :         self.historic.iter()
     719         7336 :     }
     720              : 
     721              :     /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
     722      2452910 :     pub(crate) fn search_in_memory_layer(&self, below: Lsn) -> Option<InMemoryLayerDesc> {
     723      2452910 :         let is_below = |l: &Arc<InMemoryLayer>| {
     724      1981153 :             let start_lsn = l.get_lsn_range().start;
     725      1981153 :             below > start_lsn
     726      1981153 :         };
     727              : 
     728      2452910 :         if let Some(open) = &self.open_layer {
     729      1823441 :             if is_below(open) {
     730      1208529 :                 return Some(InMemoryLayerDesc {
     731      1208529 :                     handle: InMemoryLayerHandle::Open,
     732      1208529 :                     lsn_range: open.get_lsn_range(),
     733      1208529 :                 });
     734       614912 :             }
     735       629469 :         }
     736              : 
     737      1244381 :         self.frozen_layers
     738      1244381 :             .iter()
     739      1244381 :             .enumerate()
     740      1244381 :             .rfind(|(_idx, l)| is_below(l))
     741      1244381 :             .map(|(idx, l)| InMemoryLayerDesc {
     742       156864 :                 handle: InMemoryLayerHandle::Frozen(idx),
     743       156864 :                 lsn_range: l.get_lsn_range(),
     744      1244381 :             })
     745      2452910 :     }
     746              : 
     747      1214349 :     pub(crate) fn in_memory_layer(&self, desc: &InMemoryLayerDesc) -> Arc<InMemoryLayer> {
     748      1214349 :         match desc.handle {
     749      1208529 :             InMemoryLayerHandle::Open => self.open_layer.as_ref().unwrap().clone(),
     750         5820 :             InMemoryLayerHandle::Frozen(idx) => self.frozen_layers[idx].clone(),
     751              :         }
     752      1214349 :     }
     753              : 
     754              :     ///
     755              :     /// Divide the whole given range of keys into sub-ranges based on the latest
     756              :     /// image layer that covers each range at the specified lsn (inclusive).
     757              :     /// This is used when creating  new image layers.
     758           28 :     pub fn image_coverage(
     759           28 :         &self,
     760           28 :         key_range: &Range<Key>,
     761           28 :         lsn: Lsn,
     762           28 :     ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
     763           28 :         let version = match self.historic.get().unwrap().get_version(lsn.0) {
     764           28 :             Some(v) => v,
     765            0 :             None => return vec![],
     766              :         };
     767              : 
     768           28 :         let start = key_range.start.to_i128();
     769           28 :         let end = key_range.end.to_i128();
     770           28 : 
     771           28 :         // Initialize loop variables
     772           28 :         let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
     773           28 :         let mut current_key = start;
     774           28 :         let mut current_val = version.image_coverage.query(start);
     775              : 
     776              :         // Loop through the change events and push intervals
     777           28 :         for (change_key, change_val) in version.image_coverage.range(start..end) {
     778            0 :             let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     779            0 :             coverage.push((kr, current_val.take()));
     780            0 :             current_key = change_key;
     781            0 :             current_val.clone_from(&change_val);
     782            0 :         }
     783              : 
     784              :         // Add the final interval
     785           28 :         let kr = Key::from_i128(current_key)..Key::from_i128(end);
     786           28 :         coverage.push((kr, current_val.take()));
     787           28 : 
     788           28 :         coverage
     789           28 :     }
     790              : 
     791              :     /// Check if the key range resembles that of an L0 layer.
     792        17119 :     pub fn is_l0(key_range: &Range<Key>, is_delta_layer: bool) -> bool {
     793        17119 :         is_delta_layer && key_range == &(Key::MIN..Key::MAX)
     794        17119 :     }
     795              : 
     796              :     /// This function determines which layers are counted in `count_deltas`:
     797              :     /// layers that should count towards deciding whether or not to reimage
     798              :     /// a certain partition range.
     799              :     ///
     800              :     /// There are two kinds of layers we currently consider reimage-worthy:
     801              :     ///
     802              :     /// Case 1: Non-L0 layers are currently reimage-worthy by default.
     803              :     /// TODO Some of these layers are very sparse and cover the entire key
     804              :     ///      range. Replacing 256MB of data (or less!) with terabytes of
     805              :     ///      images doesn't seem wise. We need a better heuristic, possibly
     806              :     ///      based on some of these factors:
     807              :     ///      a) whether this layer has any wal in this partition range
     808              :     ///      b) the size of the layer
     809              :     ///      c) the number of images needed to cover it
     810              :     ///      d) the estimated time until we'll have to reimage over it for GC
     811              :     ///
     812              :     /// Case 2: Since L0 layers by definition cover the entire key space, we consider
     813              :     /// them reimage-worthy only when the entire key space can be covered by very few
     814              :     /// images (currently 1).
     815              :     /// TODO The optimal number should probably be slightly higher than 1, but to
     816              :     ///      implement that we need to plumb a lot more context into this function
     817              :     ///      than just the current partition_range.
     818            0 :     pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
     819            0 :         // Case 1
     820            0 :         if !Self::is_l0(&layer.key_range, layer.is_delta) {
     821            0 :             return true;
     822            0 :         }
     823            0 : 
     824            0 :         // Case 2
     825            0 :         if partition_range == &(Key::MIN..Key::MAX) {
     826            0 :             return true;
     827            0 :         }
     828            0 : 
     829            0 :         false
     830            0 :     }
     831              : 
     832              :     /// Count the height of the tallest stack of reimage-worthy deltas
     833              :     /// in this 2d region.
     834              :     ///
     835              :     /// If `limit` is provided we don't try to count above that number.
     836              :     ///
     837              :     /// This number is used to compute the largest number of deltas that
     838              :     /// we'll need to visit for any page reconstruction in this region.
     839              :     /// We use this heuristic to decide whether to create an image layer.
     840           28 :     pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
     841           28 :         // We get the delta coverage of the region, and for each part of the coverage
     842           28 :         // we recurse right underneath the delta. The recursion depth is limited by
     843           28 :         // the largest result this function could return, which is in practice between
     844           28 :         // 3 and 10 (since we usually try to create an image when the number gets larger).
     845           28 : 
     846           28 :         if lsn.is_empty() || key.is_empty() || limit == Some(0) {
     847            0 :             return 0;
     848           28 :         }
     849              : 
     850           28 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     851           28 :             Some(v) => v,
     852            0 :             None => return 0,
     853              :         };
     854              : 
     855           28 :         let start = key.start.to_i128();
     856           28 :         let end = key.end.to_i128();
     857           28 : 
     858           28 :         // Initialize loop variables
     859           28 :         let mut max_stacked_deltas = 0;
     860           28 :         let mut current_key = start;
     861           28 :         let mut current_val = version.delta_coverage.query(start);
     862              : 
     863              :         // Loop through the delta coverage and recurse on each part
     864           28 :         for (change_key, change_val) in version.delta_coverage.range(start..end) {
     865              :             // If there's a relevant delta in this part, add 1 and recurse down
     866            0 :             if let Some(val) = &current_val {
     867            0 :                 if val.get_lsn_range().end > lsn.start {
     868            0 :                     let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     869            0 :                     let lr = lsn.start..val.get_lsn_range().start;
     870            0 :                     if !kr.is_empty() {
     871            0 :                         let base_count = Self::is_reimage_worthy(val, key) as usize;
     872            0 :                         let new_limit = limit.map(|l| l - base_count);
     873            0 :                         let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     874            0 :                         max_stacked_deltas = std::cmp::max(
     875            0 :                             max_stacked_deltas,
     876            0 :                             base_count + max_stacked_deltas_underneath,
     877            0 :                         );
     878            0 :                     }
     879            0 :                 }
     880            0 :             }
     881              : 
     882            0 :             current_key = change_key;
     883            0 :             current_val.clone_from(&change_val);
     884              :         }
     885              : 
     886              :         // Consider the last part
     887           28 :         if let Some(val) = &current_val {
     888            0 :             if val.get_lsn_range().end > lsn.start {
     889            0 :                 let kr = Key::from_i128(current_key)..Key::from_i128(end);
     890            0 :                 let lr = lsn.start..val.get_lsn_range().start;
     891            0 : 
     892            0 :                 if !kr.is_empty() {
     893            0 :                     let base_count = Self::is_reimage_worthy(val, key) as usize;
     894            0 :                     let new_limit = limit.map(|l| l - base_count);
     895            0 :                     let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     896            0 :                     max_stacked_deltas = std::cmp::max(
     897            0 :                         max_stacked_deltas,
     898            0 :                         base_count + max_stacked_deltas_underneath,
     899            0 :                     );
     900            0 :                 }
     901            0 :             }
     902           28 :         }
     903              : 
     904           28 :         max_stacked_deltas
     905           28 :     }
     906              : 
     907              :     /// Return all L0 delta layers
     908         3312 :     pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
     909         3312 :         &self.l0_delta_layers
     910         3312 :     }
     911              : 
     912              :     /// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers.
     913          892 :     pub fn watch_level0_deltas(&self) -> watch::Receiver<usize> {
     914          892 :         self.watch_l0_deltas.subscribe()
     915          892 :     }
     916              : 
     917              :     /// debugging function to print out the contents of the layer map
     918              :     #[allow(unused)]
     919            4 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     920            4 :         println!("Begin dump LayerMap");
     921            4 : 
     922            4 :         println!("open_layer:");
     923            4 :         if let Some(open_layer) = &self.open_layer {
     924            0 :             open_layer.dump(verbose, ctx).await?;
     925            4 :         }
     926              : 
     927            4 :         println!("frozen_layers:");
     928            4 :         for frozen_layer in self.frozen_layers.iter() {
     929            0 :             frozen_layer.dump(verbose, ctx).await?;
     930              :         }
     931              : 
     932            4 :         println!("historic_layers:");
     933           24 :         for desc in self.iter_historic_layers() {
     934           24 :             desc.dump();
     935           24 :         }
     936            4 :         println!("End dump LayerMap");
     937            4 :         Ok(())
     938            4 :     }
     939              : 
     940              :     /// `read_points` represent the tip of a timeline and any branch points, i.e. the places
     941              :     /// where we expect to serve reads.
     942              :     ///
     943              :     /// This function is O(N) and should be called infrequently.  The caller is responsible for
     944              :     /// looking up and updating the Layer objects for these layer descriptors.
     945          476 :     pub fn get_visibility(
     946          476 :         &self,
     947          476 :         mut read_points: Vec<Lsn>,
     948          476 :     ) -> (
     949          476 :         Vec<(Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
     950          476 :         KeySpace,
     951          476 :     ) {
     952              :         // This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
     953              :         // KeySpace is intended to be composed statically and iterated over.
     954              :         struct KeyShadow {
     955              :             // Map of range start to range end
     956              :             inner: RangeSetBlaze<i128>,
     957              :         }
     958              : 
     959              :         impl KeyShadow {
     960          476 :             fn new() -> Self {
     961          476 :                 Self {
     962          476 :                     inner: Default::default(),
     963          476 :                 }
     964          476 :             }
     965              : 
     966         3204 :             fn contains(&self, range: Range<Key>) -> bool {
     967         3204 :                 let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
     968         3204 :                 self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
     969         3204 :                     CheckSortedDisjoint::from([range_incl]),
     970         3204 :                 ))
     971         3204 :             }
     972              : 
     973              :             /// Add the input range to the keys covered by self.
     974              :             ///
     975              :             /// Return true if inserting this range covered some keys that were previously not covered
     976         3820 :             fn cover(&mut self, insert: Range<Key>) -> bool {
     977         3820 :                 let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
     978         3820 :                 self.inner.ranges_insert(range_incl)
     979         3820 :             }
     980              : 
     981          484 :             fn reset(&mut self) {
     982          484 :                 self.inner = Default::default();
     983          484 :             }
     984              : 
     985          476 :             fn to_keyspace(&self) -> KeySpace {
     986          476 :                 let mut accum = KeySpaceAccum::new();
     987          484 :                 for range_incl in self.inner.ranges() {
     988          484 :                     let range = Range {
     989          484 :                         start: Key::from_i128(*range_incl.start()),
     990          484 :                         end: Key::from_i128(range_incl.end() + 1),
     991          484 :                     };
     992          484 :                     accum.add_range(range)
     993              :                 }
     994              : 
     995          476 :                 accum.to_keyspace()
     996          476 :             }
     997              :         }
     998              : 
     999              :         // The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
    1000              :         // and a ReadPoint
    1001          476 :         read_points.sort_by_key(|rp| rp.0);
    1002          476 :         let mut shadow = KeyShadow::new();
    1003              : 
    1004              :         // We will interleave all our read points and layers into a sorted collection
    1005              :         enum Item {
    1006              :             ReadPoint { lsn: Lsn },
    1007              :             Layer(Arc<PersistentLayerDesc>),
    1008              :         }
    1009              : 
    1010          476 :         let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
    1011          476 :         items.extend(self.iter_historic_layers().map(Item::Layer));
    1012          476 :         items.extend(
    1013          476 :             read_points
    1014          476 :                 .into_iter()
    1015          484 :                 .map(|rp| Item::ReadPoint { lsn: rp }),
    1016          476 :         );
    1017          476 : 
    1018          476 :         // Ordering: we want to iterate like this:
    1019          476 :         // 1. Highest LSNs first
    1020          476 :         // 2. Consider images before deltas if they end at the same LSNs (images cover deltas)
    1021          476 :         // 3. Consider ReadPoints before image layers if they're at the same LSN (readpoints make that image visible)
    1022       127176 :         items.sort_by_key(|item| {
    1023       127176 :             std::cmp::Reverse(match item {
    1024       126280 :                 Item::Layer(layer) => {
    1025       126280 :                     if layer.is_delta() {
    1026        59256 :                         (Lsn(layer.get_lsn_range().end.0 - 1), 0)
    1027              :                     } else {
    1028        67024 :                         (layer.image_layer_lsn(), 1)
    1029              :                     }
    1030              :                 }
    1031          896 :                 Item::ReadPoint { lsn } => (*lsn, 2),
    1032              :             })
    1033       127176 :         });
    1034          476 : 
    1035          476 :         let mut results = Vec::with_capacity(self.historic.len());
    1036          476 : 
    1037          476 :         let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
    1038              : 
    1039         7984 :         for item in items {
    1040         7508 :             let (reached_lsn, is_readpoint) = match &item {
    1041          484 :                 Item::ReadPoint { lsn } => (lsn, true),
    1042         7024 :                 Item::Layer(layer) => (&layer.lsn_range.start, false),
    1043              :             };
    1044         7508 :             maybe_covered_deltas.retain(|d| {
    1045          212 :                 if *reached_lsn >= d.lsn_range.start && is_readpoint {
    1046              :                     // We encountered a readpoint within the delta layer: it is visible
    1047              : 
    1048            4 :                     results.push((d.clone(), LayerVisibilityHint::Visible));
    1049            4 :                     false
    1050          208 :                 } else if *reached_lsn < d.lsn_range.start {
    1051              :                     // We passed the layer's range without encountering a read point: it is not visible
    1052           64 :                     results.push((d.clone(), LayerVisibilityHint::Covered));
    1053           64 :                     false
    1054              :                 } else {
    1055              :                     // We're still in the delta layer: continue iterating
    1056          144 :                     true
    1057              :                 }
    1058         7508 :             });
    1059         7508 : 
    1060         7508 :             match item {
    1061          484 :                 Item::ReadPoint { lsn: _lsn } => {
    1062          484 :                     // TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
    1063          484 :                     // to assume that the whole key range is visible at the branch point.
    1064          484 :                     shadow.reset();
    1065          484 :                 }
    1066         7024 :                 Item::Layer(layer) => {
    1067         7024 :                     let visibility = if layer.is_delta() {
    1068         3204 :                         if shadow.contains(layer.get_key_range()) {
    1069              :                             // If a layer isn't visible based on current state, we must defer deciding whether
    1070              :                             // it is truly not visible until we have advanced past the delta's range: we might
    1071              :                             // encounter another branch point within this delta layer's LSN range.
    1072           76 :                             maybe_covered_deltas.push(layer);
    1073           76 :                             continue;
    1074              :                         } else {
    1075         3128 :                             LayerVisibilityHint::Visible
    1076              :                         }
    1077              :                     } else {
    1078         3820 :                         let modified = shadow.cover(layer.get_key_range());
    1079         3820 :                         if modified {
    1080              :                             // An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
    1081         3748 :                             LayerVisibilityHint::Visible
    1082              :                         } else {
    1083              :                             // An image layer in a region that was already covered
    1084           72 :                             LayerVisibilityHint::Covered
    1085              :                         }
    1086              :                     };
    1087              : 
    1088         6948 :                     results.push((layer, visibility));
    1089              :                 }
    1090              :             }
    1091              :         }
    1092              : 
    1093              :         // Drain any remaining maybe_covered deltas
    1094          476 :         results.extend(
    1095          476 :             maybe_covered_deltas
    1096          476 :                 .into_iter()
    1097          476 :                 .map(|d| (d, LayerVisibilityHint::Covered)),
    1098          476 :         );
    1099          476 : 
    1100          476 :         (results, shadow.to_keyspace())
    1101          476 :     }
    1102              : }
    1103              : 
    1104              : #[cfg(test)]
    1105              : mod tests {
    1106              :     use std::collections::HashMap;
    1107              :     use std::path::PathBuf;
    1108              : 
    1109              :     use crate::{
    1110              :         DEFAULT_PG_VERSION,
    1111              :         tenant::{harness::TenantHarness, storage_layer::LayerName},
    1112              :     };
    1113              :     use pageserver_api::key::DBDIR_KEY;
    1114              :     use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
    1115              :     use utils::id::{TenantId, TimelineId};
    1116              :     use utils::shard::TenantShardId;
    1117              : 
    1118              :     use super::*;
    1119              :     use crate::tenant::IndexPart;
    1120              : 
    1121              :     #[derive(Clone)]
    1122              :     struct LayerDesc {
    1123              :         key_range: Range<Key>,
    1124              :         lsn_range: Range<Lsn>,
    1125              :         is_delta: bool,
    1126              :     }
    1127              : 
    1128            4 :     fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
    1129            4 :         let mut layer_map = LayerMap::default();
    1130              : 
    1131           24 :         for layer in layers {
    1132           20 :             layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
    1133           20 :                 layer.key_range,
    1134           20 :                 layer.lsn_range,
    1135           20 :                 layer.is_delta,
    1136           20 :             ));
    1137           20 :         }
    1138              : 
    1139            4 :         layer_map.flush_updates();
    1140            4 :         layer_map
    1141            4 :     }
    1142              : 
    1143        14164 :     fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
    1144        14164 :         let lhs: HashMap<SearchResult, KeySpace> = lhs
    1145        14164 :             .found
    1146        14164 :             .into_iter()
    1147        27280 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1148        14164 :             .collect();
    1149        14164 :         let rhs: HashMap<SearchResult, KeySpace> = rhs
    1150        14164 :             .found
    1151        14164 :             .into_iter()
    1152        27280 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1153        14164 :             .collect();
    1154        14164 : 
    1155        14164 :         assert_eq!(lhs, rhs);
    1156        14164 :     }
    1157              : 
    1158              :     #[cfg(test)]
    1159        14160 :     fn brute_force_range_search(
    1160        14160 :         layer_map: &LayerMap,
    1161        14160 :         key_range: Range<Key>,
    1162        14160 :         end_lsn: Lsn,
    1163        14160 :     ) -> RangeSearchResult {
    1164        14160 :         let mut range_search_result = RangeSearchResult::new();
    1165        14160 : 
    1166        14160 :         let mut key = key_range.start;
    1167       302080 :         while key != key_range.end {
    1168       287920 :             let res = layer_map.search(key, end_lsn);
    1169       287920 :             if let Some(res) = res {
    1170       266600 :                 range_search_result
    1171       266600 :                     .found
    1172       266600 :                     .entry(res)
    1173       266600 :                     .or_default()
    1174       266600 :                     .add_key(key);
    1175       266600 :             }
    1176              : 
    1177       287920 :             key = key.next();
    1178              :         }
    1179              : 
    1180        14160 :         range_search_result
    1181        14160 :     }
    1182              : 
    1183              :     #[test]
    1184            4 :     fn ranged_search_on_empty_layer_map() {
    1185            4 :         let layer_map = LayerMap::default();
    1186            4 :         let range = Key::from_i128(100)..Key::from_i128(200);
    1187            4 : 
    1188            4 :         let res = layer_map.range_search(range.clone(), Lsn(100));
    1189            4 :         assert_range_search_result_eq(res, RangeSearchResult::new());
    1190            4 :     }
    1191              : 
    1192              :     #[tokio::test]
    1193            4 :     async fn ranged_search() {
    1194            4 :         let harness = TenantHarness::create("ranged_search").await.unwrap();
    1195            4 :         let (tenant, ctx) = harness.load().await;
    1196            4 :         let timeline_id = TimelineId::generate();
    1197            4 :         // Create the timeline such that the in-memory layers can be written
    1198            4 :         // to the timeline directory.
    1199            4 :         tenant
    1200            4 :             .create_test_timeline(timeline_id, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1201            4 :             .await
    1202            4 :             .unwrap();
    1203            4 : 
    1204            4 :         let gate = utils::sync::gate::Gate::default();
    1205            4 :         let add_in_memory_layer = async |layer_map: &mut LayerMap, lsn_range: Range<Lsn>| {
    1206            4 :             let layer = InMemoryLayer::create(
    1207            4 :                 harness.conf,
    1208            4 :                 timeline_id,
    1209            4 :                 harness.tenant_shard_id,
    1210            4 :                 lsn_range.start,
    1211            4 :                 &gate,
    1212            4 :                 &ctx,
    1213            4 :             )
    1214            4 :             .await
    1215            4 :             .unwrap();
    1216            4 : 
    1217            4 :             layer.freeze(lsn_range.end).await;
    1218            4 : 
    1219            4 :             layer_map.frozen_layers.push_back(Arc::new(layer));
    1220            4 :         };
    1221            4 : 
    1222            4 :         let in_memory_layer_configurations = [
    1223            4 :             vec![],
    1224            4 :             // Overlaps with the top-most image
    1225            4 :             vec![Lsn(35)..Lsn(50)],
    1226            4 :         ];
    1227            4 : 
    1228            4 :         let layers = vec![
    1229            4 :             LayerDesc {
    1230            4 :                 key_range: Key::from_i128(15)..Key::from_i128(50),
    1231            4 :                 lsn_range: Lsn(5)..Lsn(6),
    1232            4 :                 is_delta: false,
    1233            4 :             },
    1234            4 :             LayerDesc {
    1235            4 :                 key_range: Key::from_i128(10)..Key::from_i128(20),
    1236            4 :                 lsn_range: Lsn(5)..Lsn(20),
    1237            4 :                 is_delta: true,
    1238            4 :             },
    1239            4 :             LayerDesc {
    1240            4 :                 key_range: Key::from_i128(15)..Key::from_i128(25),
    1241            4 :                 lsn_range: Lsn(20)..Lsn(30),
    1242            4 :                 is_delta: true,
    1243            4 :             },
    1244            4 :             LayerDesc {
    1245            4 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1246            4 :                 lsn_range: Lsn(25)..Lsn(35),
    1247            4 :                 is_delta: true,
    1248            4 :             },
    1249            4 :             LayerDesc {
    1250            4 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1251            4 :                 lsn_range: Lsn(40)..Lsn(41),
    1252            4 :                 is_delta: false,
    1253            4 :             },
    1254            4 :         ];
    1255            4 : 
    1256            4 :         let mut layer_map = create_layer_map(layers.clone());
    1257           12 :         for in_memory_layers in in_memory_layer_configurations {
    1258           12 :             for in_mem_layer_range in in_memory_layers {
    1259            4 :                 add_in_memory_layer(&mut layer_map, in_mem_layer_range).await;
    1260            4 :             }
    1261            4 : 
    1262          488 :             for start in 0..60 {
    1263        14160 :                 for end in (start + 1)..60 {
    1264        14160 :                     let range = Key::from_i128(start)..Key::from_i128(end);
    1265        14160 :                     let result = layer_map.range_search(range.clone(), Lsn(100));
    1266        14160 :                     let expected = brute_force_range_search(&layer_map, range, Lsn(100));
    1267        14160 : 
    1268        14160 :                     eprintln!("{start}..{end}: {result:?}");
    1269        14160 : 
    1270        14160 :                     assert_range_search_result_eq(result, expected);
    1271        14160 :                 }
    1272            4 :             }
    1273            4 :         }
    1274            4 :     }
    1275              : 
    1276              :     #[test]
    1277            4 :     fn layer_visibility_basic() {
    1278            4 :         // A simple synthetic input, as a smoke test.
    1279            4 :         let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
    1280            4 :         let timeline_id = TimelineId::generate();
    1281            4 :         let mut layer_map = LayerMap::default();
    1282            4 :         let mut updates = layer_map.batch_update();
    1283              : 
    1284              :         const FAKE_LAYER_SIZE: u64 = 1024;
    1285              : 
    1286            4 :         let inject_delta = |updates: &mut BatchedUpdates,
    1287              :                             key_start: i128,
    1288              :                             key_end: i128,
    1289              :                             lsn_start: u64,
    1290           28 :                             lsn_end: u64| {
    1291           28 :             let desc = PersistentLayerDesc::new_delta(
    1292           28 :                 tenant_shard_id,
    1293           28 :                 timeline_id,
    1294           28 :                 Range {
    1295           28 :                     start: Key::from_i128(key_start),
    1296           28 :                     end: Key::from_i128(key_end),
    1297           28 :                 },
    1298           28 :                 Range {
    1299           28 :                     start: Lsn(lsn_start),
    1300           28 :                     end: Lsn(lsn_end),
    1301           28 :                 },
    1302           28 :                 1024,
    1303           28 :             );
    1304           28 :             updates.insert_historic(desc.clone());
    1305           28 :             desc
    1306           28 :         };
    1307              : 
    1308            4 :         let inject_image =
    1309           24 :             |updates: &mut BatchedUpdates, key_start: i128, key_end: i128, lsn: u64| {
    1310           24 :                 let desc = PersistentLayerDesc::new_img(
    1311           24 :                     tenant_shard_id,
    1312           24 :                     timeline_id,
    1313           24 :                     Range {
    1314           24 :                         start: Key::from_i128(key_start),
    1315           24 :                         end: Key::from_i128(key_end),
    1316           24 :                     },
    1317           24 :                     Lsn(lsn),
    1318           24 :                     FAKE_LAYER_SIZE,
    1319           24 :                 );
    1320           24 :                 updates.insert_historic(desc.clone());
    1321           24 :                 desc
    1322           24 :             };
    1323              : 
    1324              :         //
    1325              :         // Construct our scenario: the following lines go in backward-LSN order, constructing the various scenarios
    1326              :         // we expect to handle.  You can follow these examples through in the same order as they would be processed
    1327              :         // by the function under test.
    1328              :         //
    1329              : 
    1330            4 :         let mut read_points = vec![Lsn(1000)];
    1331            4 : 
    1332            4 :         // A delta ahead of any image layer
    1333            4 :         let ahead_layer = inject_delta(&mut updates, 10, 20, 101, 110);
    1334            4 : 
    1335            4 :         // An image layer is visible and covers some layers beneath itself
    1336            4 :         let visible_covering_img = inject_image(&mut updates, 5, 25, 99);
    1337            4 : 
    1338            4 :         // A delta layer covered by the image layer: should be covered
    1339            4 :         let covered_delta = inject_delta(&mut updates, 10, 20, 90, 100);
    1340            4 : 
    1341            4 :         // A delta layer partially covered by an image layer: should be visible
    1342            4 :         let partially_covered_delta = inject_delta(&mut updates, 1, 7, 90, 100);
    1343            4 : 
    1344            4 :         // A delta layer not covered by an image layer: should be visible
    1345            4 :         let not_covered_delta = inject_delta(&mut updates, 1, 4, 90, 100);
    1346            4 : 
    1347            4 :         // An image layer covered by the image layer above: should be covered
    1348            4 :         let covered_image = inject_image(&mut updates, 10, 20, 89);
    1349            4 : 
    1350            4 :         // An image layer partially covered by an image layer: should be visible
    1351            4 :         let partially_covered_image = inject_image(&mut updates, 1, 7, 89);
    1352            4 : 
    1353            4 :         // An image layer not covered by an image layer: should be visible
    1354            4 :         let not_covered_image = inject_image(&mut updates, 1, 4, 89);
    1355            4 : 
    1356            4 :         // A read point: this will make subsequent layers below here visible, even if there are
    1357            4 :         // more recent layers covering them.
    1358            4 :         read_points.push(Lsn(80));
    1359            4 : 
    1360            4 :         // A delta layer covered by an earlier image layer, but visible to a readpoint below that covering layer
    1361            4 :         let covered_delta_below_read_point = inject_delta(&mut updates, 10, 20, 70, 79);
    1362            4 : 
    1363            4 :         // A delta layer whose end LSN is covered, but where a read point is present partway through its LSN range:
    1364            4 :         // the read point should make it visible, even though its end LSN is covered
    1365            4 :         let covering_img_between_read_points = inject_image(&mut updates, 10, 20, 69);
    1366            4 :         let covered_delta_between_read_points = inject_delta(&mut updates, 10, 15, 67, 69);
    1367            4 :         read_points.push(Lsn(65));
    1368            4 :         let covered_delta_intersects_read_point = inject_delta(&mut updates, 15, 20, 60, 69);
    1369            4 : 
    1370            4 :         let visible_img_after_last_read_point = inject_image(&mut updates, 10, 20, 65);
    1371            4 : 
    1372            4 :         updates.flush();
    1373            4 : 
    1374            4 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1375            4 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1376            4 : 
    1377            4 :         assert_eq!(
    1378            4 :             layer_visibilities.get(&ahead_layer),
    1379            4 :             Some(&LayerVisibilityHint::Visible)
    1380            4 :         );
    1381            4 :         assert_eq!(
    1382            4 :             layer_visibilities.get(&visible_covering_img),
    1383            4 :             Some(&LayerVisibilityHint::Visible)
    1384            4 :         );
    1385            4 :         assert_eq!(
    1386            4 :             layer_visibilities.get(&covered_delta),
    1387            4 :             Some(&LayerVisibilityHint::Covered)
    1388            4 :         );
    1389            4 :         assert_eq!(
    1390            4 :             layer_visibilities.get(&partially_covered_delta),
    1391            4 :             Some(&LayerVisibilityHint::Visible)
    1392            4 :         );
    1393            4 :         assert_eq!(
    1394            4 :             layer_visibilities.get(&not_covered_delta),
    1395            4 :             Some(&LayerVisibilityHint::Visible)
    1396            4 :         );
    1397            4 :         assert_eq!(
    1398            4 :             layer_visibilities.get(&covered_image),
    1399            4 :             Some(&LayerVisibilityHint::Covered)
    1400            4 :         );
    1401            4 :         assert_eq!(
    1402            4 :             layer_visibilities.get(&partially_covered_image),
    1403            4 :             Some(&LayerVisibilityHint::Visible)
    1404            4 :         );
    1405            4 :         assert_eq!(
    1406            4 :             layer_visibilities.get(&not_covered_image),
    1407            4 :             Some(&LayerVisibilityHint::Visible)
    1408            4 :         );
    1409            4 :         assert_eq!(
    1410            4 :             layer_visibilities.get(&covered_delta_below_read_point),
    1411            4 :             Some(&LayerVisibilityHint::Visible)
    1412            4 :         );
    1413            4 :         assert_eq!(
    1414            4 :             layer_visibilities.get(&covering_img_between_read_points),
    1415            4 :             Some(&LayerVisibilityHint::Visible)
    1416            4 :         );
    1417            4 :         assert_eq!(
    1418            4 :             layer_visibilities.get(&covered_delta_between_read_points),
    1419            4 :             Some(&LayerVisibilityHint::Covered)
    1420            4 :         );
    1421            4 :         assert_eq!(
    1422            4 :             layer_visibilities.get(&covered_delta_intersects_read_point),
    1423            4 :             Some(&LayerVisibilityHint::Visible)
    1424            4 :         );
    1425            4 :         assert_eq!(
    1426            4 :             layer_visibilities.get(&visible_img_after_last_read_point),
    1427            4 :             Some(&LayerVisibilityHint::Visible)
    1428            4 :         );
    1429              : 
    1430              :         // Shadow should include all the images below the last read point
    1431            4 :         let expected_shadow = KeySpace {
    1432            4 :             ranges: vec![Key::from_i128(10)..Key::from_i128(20)],
    1433            4 :         };
    1434            4 :         assert_eq!(shadow, expected_shadow);
    1435            4 :     }
    1436              : 
    1437            4 :     fn fixture_path(relative: &str) -> PathBuf {
    1438            4 :         PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
    1439            4 :     }
    1440              : 
    1441              :     #[test]
    1442            4 :     fn layer_visibility_realistic() {
    1443            4 :         // Load a large example layermap
    1444            4 :         let index_raw = std::fs::read_to_string(fixture_path(
    1445            4 :             "test_data/indices/mixed_workload/index_part.json",
    1446            4 :         ))
    1447            4 :         .unwrap();
    1448            4 :         let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
    1449            4 : 
    1450            4 :         let tenant_id = TenantId::generate();
    1451            4 :         let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1452            4 :         let timeline_id = TimelineId::generate();
    1453            4 : 
    1454            4 :         let mut layer_map = LayerMap::default();
    1455            4 :         let mut updates = layer_map.batch_update();
    1456         6260 :         for (layer_name, layer_metadata) in index.layer_metadata {
    1457         6256 :             let layer_desc = match layer_name {
    1458         3208 :                 LayerName::Image(layer_name) => PersistentLayerDesc {
    1459         3208 :                     key_range: layer_name.key_range.clone(),
    1460         3208 :                     lsn_range: layer_name.lsn_as_range(),
    1461         3208 :                     tenant_shard_id,
    1462         3208 :                     timeline_id,
    1463         3208 :                     is_delta: false,
    1464         3208 :                     file_size: layer_metadata.file_size,
    1465         3208 :                 },
    1466         3048 :                 LayerName::Delta(layer_name) => PersistentLayerDesc {
    1467         3048 :                     key_range: layer_name.key_range,
    1468         3048 :                     lsn_range: layer_name.lsn_range,
    1469         3048 :                     tenant_shard_id,
    1470         3048 :                     timeline_id,
    1471         3048 :                     is_delta: true,
    1472         3048 :                     file_size: layer_metadata.file_size,
    1473         3048 :                 },
    1474              :             };
    1475         6256 :             updates.insert_historic(layer_desc);
    1476              :         }
    1477            4 :         updates.flush();
    1478            4 : 
    1479            4 :         let read_points = vec![index.metadata.disk_consistent_lsn()];
    1480            4 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1481         6260 :         for (layer_desc, visibility) in &layer_visibilities {
    1482         6256 :             tracing::info!("{layer_desc:?}: {visibility:?}");
    1483         6256 :             eprintln!("{layer_desc:?}: {visibility:?}");
    1484              :         }
    1485              : 
    1486              :         // The shadow should be non-empty, since there were some image layers
    1487            4 :         assert!(!shadow.ranges.is_empty());
    1488              : 
    1489              :         // At least some layers should be marked covered
    1490            4 :         assert!(
    1491            4 :             layer_visibilities
    1492            4 :                 .iter()
    1493           76 :                 .any(|i| matches!(i.1, LayerVisibilityHint::Covered))
    1494            4 :         );
    1495              : 
    1496            4 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1497              : 
    1498              :         // Brute force validation: a layer should be marked covered if and only if there are image layers above it in LSN order which cover it
    1499         6260 :         for (layer_desc, visible) in &layer_visibilities {
    1500         6256 :             let mut coverage = KeySpaceRandomAccum::new();
    1501         6256 :             let mut covered_by = Vec::new();
    1502              : 
    1503      9784384 :             for other_layer in layer_map.iter_historic_layers() {
    1504      9784384 :                 if &other_layer == layer_desc {
    1505         6256 :                     continue;
    1506      9778128 :                 }
    1507      9778128 :                 if !other_layer.is_delta()
    1508      5014104 :                     && other_layer.image_layer_lsn() >= Lsn(layer_desc.get_lsn_range().end.0 - 1)
    1509      2527024 :                     && other_layer.key_range.start <= layer_desc.key_range.end
    1510       930608 :                     && layer_desc.key_range.start <= other_layer.key_range.end
    1511       171292 :                 {
    1512       171292 :                     coverage.add_range(other_layer.get_key_range());
    1513       171292 :                     covered_by.push((*other_layer).clone());
    1514      9606836 :                 }
    1515              :             }
    1516         6256 :             let coverage = coverage.to_keyspace();
    1517              : 
    1518         6256 :             let expect_visible = if coverage.ranges.len() == 1
    1519         1512 :                 && coverage.contains(&layer_desc.key_range.start)
    1520           68 :                 && coverage.contains(&Key::from_i128(layer_desc.key_range.end.to_i128() - 1))
    1521              :             {
    1522           40 :                 LayerVisibilityHint::Covered
    1523              :             } else {
    1524         6216 :                 LayerVisibilityHint::Visible
    1525              :             };
    1526              : 
    1527         6256 :             if expect_visible != *visible {
    1528            0 :                 eprintln!(
    1529            0 :                     "Layer {}..{} @ {}..{} (delta={}) is {visible:?}, should be {expect_visible:?}",
    1530            0 :                     layer_desc.key_range.start,
    1531            0 :                     layer_desc.key_range.end,
    1532            0 :                     layer_desc.lsn_range.start,
    1533            0 :                     layer_desc.lsn_range.end,
    1534            0 :                     layer_desc.is_delta()
    1535            0 :                 );
    1536            0 :                 if expect_visible == LayerVisibilityHint::Covered {
    1537            0 :                     eprintln!("Covered by:");
    1538            0 :                     for other in covered_by {
    1539            0 :                         eprintln!(
    1540            0 :                             "  {}..{} @ {}",
    1541            0 :                             other.get_key_range().start,
    1542            0 :                             other.get_key_range().end,
    1543            0 :                             other.image_layer_lsn()
    1544            0 :                         );
    1545            0 :                     }
    1546            0 :                     if let Some(range) = coverage.ranges.first() {
    1547            0 :                         eprintln!(
    1548            0 :                             "Total coverage from contributing layers: {}..{}",
    1549            0 :                             range.start, range.end
    1550            0 :                         );
    1551            0 :                     } else {
    1552            0 :                         eprintln!(
    1553            0 :                             "Total coverage from contributing layers: {:?}",
    1554            0 :                             coverage.ranges
    1555            0 :                         );
    1556            0 :                     }
    1557            0 :                 }
    1558         6256 :             }
    1559         6256 :             assert_eq!(expect_visible, *visible);
    1560              :         }
    1561              : 
    1562              :         // Sanity: the layer that holds latest data for the DBDIR key should always be visible
    1563              :         // (just using this key as a key that will always exist for any layermap fixture)
    1564            4 :         let dbdir_layer = {
    1565            4 :             let readable_layer = layer_map
    1566            4 :                 .search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
    1567            4 :                 .unwrap();
    1568            4 : 
    1569            4 :             match readable_layer.layer {
    1570            4 :                 ReadableLayerWeak::PersistentLayer(desc) => desc,
    1571            0 :                 ReadableLayerWeak::InMemoryLayer(_) => unreachable!(""),
    1572              :             }
    1573              :         };
    1574            4 :         assert!(matches!(
    1575            4 :             layer_visibilities.get(&dbdir_layer).unwrap(),
    1576              :             LayerVisibilityHint::Visible
    1577              :         ));
    1578            4 :     }
    1579              : }
    1580              : 
    1581              : #[cfg(test)]
    1582              : mod select_layer_tests {
    1583              :     use super::*;
    1584              : 
    1585           68 :     fn create_persistent_layer(
    1586           68 :         start_lsn: u64,
    1587           68 :         end_lsn: u64,
    1588           68 :         is_delta: bool,
    1589           68 :     ) -> Arc<PersistentLayerDesc> {
    1590           68 :         if !is_delta {
    1591           32 :             assert_eq!(end_lsn, start_lsn + 1);
    1592           36 :         }
    1593              : 
    1594           68 :         Arc::new(PersistentLayerDesc::new_test(
    1595           68 :             Key::MIN..Key::MAX,
    1596           68 :             Lsn(start_lsn)..Lsn(end_lsn),
    1597           68 :             is_delta,
    1598           68 :         ))
    1599           68 :     }
    1600              : 
    1601           24 :     fn create_inmem_layer(start_lsn: u64, end_lsn: u64) -> InMemoryLayerDesc {
    1602           24 :         InMemoryLayerDesc {
    1603           24 :             handle: InMemoryLayerHandle::Open,
    1604           24 :             lsn_range: Lsn(start_lsn)..Lsn(end_lsn),
    1605           24 :         }
    1606           24 :     }
    1607              : 
    1608              :     #[test]
    1609            4 :     fn test_select_layer_empty() {
    1610            4 :         assert!(LayerMap::select_layer(None, None, None, Lsn(100)).is_none());
    1611            4 :     }
    1612              : 
    1613              :     #[test]
    1614            4 :     fn test_select_layer_only_delta() {
    1615            4 :         let delta = create_persistent_layer(10, 20, true);
    1616            4 :         let result = LayerMap::select_layer(Some(delta.clone()), None, None, Lsn(100)).unwrap();
    1617            4 : 
    1618            4 :         assert_eq!(result.lsn_floor, Lsn(10));
    1619            4 :         assert!(
    1620            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1621              :         );
    1622            4 :     }
    1623              : 
    1624              :     #[test]
    1625            4 :     fn test_select_layer_only_image() {
    1626            4 :         let image = create_persistent_layer(10, 11, false);
    1627            4 :         let result = LayerMap::select_layer(None, Some(image.clone()), None, Lsn(100)).unwrap();
    1628            4 : 
    1629            4 :         assert_eq!(result.lsn_floor, Lsn(10));
    1630            4 :         assert!(
    1631            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1632              :         );
    1633            4 :     }
    1634              : 
    1635              :     #[test]
    1636            4 :     fn test_select_layer_only_inmem() {
    1637            4 :         let inmem = create_inmem_layer(10, 20);
    1638            4 :         let result = LayerMap::select_layer(None, None, Some(inmem.clone()), Lsn(100)).unwrap();
    1639            4 : 
    1640            4 :         assert_eq!(result.lsn_floor, Lsn(10));
    1641            4 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1642            4 :     }
    1643              : 
    1644              :     #[test]
    1645            4 :     fn test_select_layer_image_inside_delta() {
    1646            4 :         let delta = create_persistent_layer(10, 20, true);
    1647            4 :         let image = create_persistent_layer(15, 16, false);
    1648            4 : 
    1649            4 :         let result =
    1650            4 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(100))
    1651            4 :                 .unwrap();
    1652            4 : 
    1653            4 :         assert_eq!(result.lsn_floor, Lsn(16));
    1654            4 :         assert!(
    1655            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1656              :         );
    1657              : 
    1658            4 :         let result = LayerMap::select_layer(
    1659            4 :             Some(delta.clone()),
    1660            4 :             Some(image.clone()),
    1661            4 :             None,
    1662            4 :             result.lsn_floor,
    1663            4 :         )
    1664            4 :         .unwrap();
    1665            4 : 
    1666            4 :         assert_eq!(result.lsn_floor, Lsn(15));
    1667            4 :         assert!(
    1668            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1669              :         );
    1670            4 :     }
    1671              : 
    1672              :     #[test]
    1673            4 :     fn test_select_layer_newer_image() {
    1674            4 :         let delta = create_persistent_layer(10, 20, true);
    1675            4 :         let image = create_persistent_layer(25, 26, false);
    1676            4 : 
    1677            4 :         let result =
    1678            4 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1679            4 :                 .unwrap();
    1680            4 : 
    1681            4 :         assert_eq!(result.lsn_floor, Lsn(25));
    1682            4 :         assert!(
    1683            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1684              :         );
    1685              : 
    1686            4 :         let result =
    1687            4 :             LayerMap::select_layer(Some(delta.clone()), None, None, result.lsn_floor).unwrap();
    1688            4 : 
    1689            4 :         assert_eq!(result.lsn_floor, Lsn(10));
    1690            4 :         assert!(
    1691            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1692              :         );
    1693            4 :     }
    1694              : 
    1695              :     #[test]
    1696            4 :     fn test_select_layer_delta_with_older_image() {
    1697            4 :         let delta = create_persistent_layer(15, 25, true);
    1698            4 :         let image = create_persistent_layer(10, 11, false);
    1699            4 : 
    1700            4 :         let result =
    1701            4 :             LayerMap::select_layer(Some(delta.clone()), Some(image.clone()), None, Lsn(30))
    1702            4 :                 .unwrap();
    1703            4 : 
    1704            4 :         assert_eq!(result.lsn_floor, Lsn(15));
    1705            4 :         assert!(
    1706            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1707              :         );
    1708              : 
    1709            4 :         let result =
    1710            4 :             LayerMap::select_layer(None, Some(image.clone()), None, result.lsn_floor).unwrap();
    1711            4 : 
    1712            4 :         assert_eq!(result.lsn_floor, Lsn(10));
    1713            4 :         assert!(
    1714            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1715              :         );
    1716            4 :     }
    1717              : 
    1718              :     #[test]
    1719            4 :     fn test_select_layer_image_inside_inmem() {
    1720            4 :         let image = create_persistent_layer(15, 16, false);
    1721            4 :         let inmem = create_inmem_layer(10, 25);
    1722            4 : 
    1723            4 :         let result =
    1724            4 :             LayerMap::select_layer(None, Some(image.clone()), Some(inmem.clone()), Lsn(30))
    1725            4 :                 .unwrap();
    1726            4 : 
    1727            4 :         assert_eq!(result.lsn_floor, Lsn(16));
    1728            4 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1729              : 
    1730            4 :         let result = LayerMap::select_layer(
    1731            4 :             None,
    1732            4 :             Some(image.clone()),
    1733            4 :             Some(inmem.clone()),
    1734            4 :             result.lsn_floor,
    1735            4 :         )
    1736            4 :         .unwrap();
    1737            4 : 
    1738            4 :         assert_eq!(result.lsn_floor, Lsn(15));
    1739            4 :         assert!(
    1740            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1741              :         );
    1742              : 
    1743            4 :         let result =
    1744            4 :             LayerMap::select_layer(None, None, Some(inmem.clone()), result.lsn_floor).unwrap();
    1745            4 :         assert_eq!(result.lsn_floor, Lsn(10));
    1746            4 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1747            4 :     }
    1748              : 
    1749              :     #[test]
    1750            4 :     fn test_select_layer_delta_inside_inmem() {
    1751            4 :         let delta_top = create_persistent_layer(15, 20, true);
    1752            4 :         let delta_bottom = create_persistent_layer(10, 15, true);
    1753            4 :         let inmem = create_inmem_layer(15, 25);
    1754            4 : 
    1755            4 :         let result =
    1756            4 :             LayerMap::select_layer(Some(delta_top.clone()), None, Some(inmem.clone()), Lsn(30))
    1757            4 :                 .unwrap();
    1758            4 : 
    1759            4 :         assert_eq!(result.lsn_floor, Lsn(20));
    1760            4 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1761              : 
    1762            4 :         let result = LayerMap::select_layer(
    1763            4 :             Some(delta_top.clone()),
    1764            4 :             None,
    1765            4 :             Some(inmem.clone()),
    1766            4 :             result.lsn_floor,
    1767            4 :         )
    1768            4 :         .unwrap();
    1769            4 : 
    1770            4 :         assert_eq!(result.lsn_floor, Lsn(15));
    1771            4 :         assert!(
    1772            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_top))
    1773              :         );
    1774              : 
    1775            4 :         let result = LayerMap::select_layer(
    1776            4 :             Some(delta_bottom.clone()),
    1777            4 :             None,
    1778            4 :             Some(inmem.clone()),
    1779            4 :             result.lsn_floor,
    1780            4 :         )
    1781            4 :         .unwrap();
    1782            4 :         assert_eq!(result.lsn_floor, Lsn(10));
    1783            4 :         assert!(
    1784            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta_bottom))
    1785              :         );
    1786            4 :     }
    1787              : 
    1788              :     #[test]
    1789            4 :     fn test_select_layer_all_overlap_1() {
    1790            4 :         let inmem = create_inmem_layer(10, 30);
    1791            4 :         let delta = create_persistent_layer(15, 25, true);
    1792            4 :         let image = create_persistent_layer(20, 21, false);
    1793            4 : 
    1794            4 :         let result = LayerMap::select_layer(
    1795            4 :             Some(delta.clone()),
    1796            4 :             Some(image.clone()),
    1797            4 :             Some(inmem.clone()),
    1798            4 :             Lsn(50),
    1799            4 :         )
    1800            4 :         .unwrap();
    1801            4 : 
    1802            4 :         assert_eq!(result.lsn_floor, Lsn(25));
    1803            4 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1804              : 
    1805            4 :         let result = LayerMap::select_layer(
    1806            4 :             Some(delta.clone()),
    1807            4 :             Some(image.clone()),
    1808            4 :             Some(inmem.clone()),
    1809            4 :             result.lsn_floor,
    1810            4 :         )
    1811            4 :         .unwrap();
    1812            4 : 
    1813            4 :         assert_eq!(result.lsn_floor, Lsn(21));
    1814            4 :         assert!(
    1815            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1816              :         );
    1817              : 
    1818            4 :         let result = LayerMap::select_layer(
    1819            4 :             Some(delta.clone()),
    1820            4 :             Some(image.clone()),
    1821            4 :             Some(inmem.clone()),
    1822            4 :             result.lsn_floor,
    1823            4 :         )
    1824            4 :         .unwrap();
    1825            4 : 
    1826            4 :         assert_eq!(result.lsn_floor, Lsn(20));
    1827            4 :         assert!(
    1828            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1829              :         );
    1830            4 :     }
    1831              : 
    1832              :     #[test]
    1833            4 :     fn test_select_layer_all_overlap_2() {
    1834            4 :         let inmem = create_inmem_layer(20, 30);
    1835            4 :         let delta = create_persistent_layer(10, 40, true);
    1836            4 :         let image = create_persistent_layer(25, 26, false);
    1837            4 : 
    1838            4 :         let result = LayerMap::select_layer(
    1839            4 :             Some(delta.clone()),
    1840            4 :             Some(image.clone()),
    1841            4 :             Some(inmem.clone()),
    1842            4 :             Lsn(50),
    1843            4 :         )
    1844            4 :         .unwrap();
    1845            4 : 
    1846            4 :         assert_eq!(result.lsn_floor, Lsn(26));
    1847            4 :         assert!(
    1848            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1849              :         );
    1850              : 
    1851            4 :         let result = LayerMap::select_layer(
    1852            4 :             Some(delta.clone()),
    1853            4 :             Some(image.clone()),
    1854            4 :             Some(inmem.clone()),
    1855            4 :             result.lsn_floor,
    1856            4 :         )
    1857            4 :         .unwrap();
    1858            4 : 
    1859            4 :         assert_eq!(result.lsn_floor, Lsn(25));
    1860            4 :         assert!(
    1861            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1862              :         );
    1863            4 :     }
    1864              : 
    1865              :     #[test]
    1866            4 :     fn test_select_layer_all_overlap_3() {
    1867            4 :         let inmem = create_inmem_layer(30, 40);
    1868            4 :         let delta = create_persistent_layer(10, 30, true);
    1869            4 :         let image = create_persistent_layer(20, 21, false);
    1870            4 : 
    1871            4 :         let result = LayerMap::select_layer(
    1872            4 :             Some(delta.clone()),
    1873            4 :             Some(image.clone()),
    1874            4 :             Some(inmem.clone()),
    1875            4 :             Lsn(50),
    1876            4 :         )
    1877            4 :         .unwrap();
    1878            4 : 
    1879            4 :         assert_eq!(result.lsn_floor, Lsn(30));
    1880            4 :         assert!(matches!(result.layer, ReadableLayerWeak::InMemoryLayer(l) if l == inmem));
    1881              : 
    1882            4 :         let result = LayerMap::select_layer(
    1883            4 :             Some(delta.clone()),
    1884            4 :             Some(image.clone()),
    1885            4 :             None,
    1886            4 :             result.lsn_floor,
    1887            4 :         )
    1888            4 :         .unwrap();
    1889            4 : 
    1890            4 :         assert_eq!(result.lsn_floor, Lsn(21));
    1891            4 :         assert!(
    1892            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &delta))
    1893              :         );
    1894              : 
    1895            4 :         let result = LayerMap::select_layer(
    1896            4 :             Some(delta.clone()),
    1897            4 :             Some(image.clone()),
    1898            4 :             None,
    1899            4 :             result.lsn_floor,
    1900            4 :         )
    1901            4 :         .unwrap();
    1902            4 : 
    1903            4 :         assert_eq!(result.lsn_floor, Lsn(20));
    1904            4 :         assert!(
    1905            4 :             matches!(result.layer, ReadableLayerWeak::PersistentLayer(l) if Arc::ptr_eq(&l, &image))
    1906              :         );
    1907            4 :     }
    1908              : }
        

Generated by: LCOV version 2.1-beta