LCOV - code coverage report
Current view: top level - pageserver/src/tenant - layer_map.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 80.2 % 936 751
Test Date: 2025-02-20 13:11:02 Functions: 87.1 % 70 61

            Line data    Source code
       1              : //!
       2              : //! The layer map tracks what layers exist in a timeline.
       3              : //!
       4              : //! When the timeline is first accessed, the server lists of all layer files
       5              : //! in the timelines/<timeline_id> directory, and populates this map with
       6              : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
       7              : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
       8              : //! records. Now and then, in the checkpoint() function, the in-memory layer is
       9              : //! are frozen, and it is split up into new image and delta layers and the
      10              : //! corresponding files are written to disk.
      11              : //!
      12              : //! Design overview:
      13              : //!
      14              : //! The `search` method of the layer map is on the read critical path, so we've
      15              : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
      16              : //! Other read methods are less critical but still impact performance of background tasks.
      17              : //!
      18              : //! This data structure relies on a persistent/immutable binary search tree. See the
      19              : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
      20              : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
      21              : //! you to modify the tree in such a way that each modification creates a new "version"
      22              : //! of the tree. When you modify it, you get a new version, but all previous versions are
      23              : //! still accessible too. So if someone is still holding a reference to an older version,
      24              : //! they continue to see the tree as it was then. The persistent BST stores all the
      25              : //! different versions in an efficient way.
      26              : //!
      27              : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
      28              : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
      29              : //! to handle the LSN dimension.
      30              : //!
      31              : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
      32              : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
      33              : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
      34              : //! `historic_layer_coverage.rs`.
      35              : //!
      36              : //! To search for a particular key-LSN pair, you first look up the right "version" in the
      37              : //! BTreeMap. Then you search that version of the BST with the key.
      38              : //!
      39              : //! The persistent BST keeps all the versions, but there is no way to change the old versions
      40              : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
      41              : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
      42              : //! to throw away most of the persistent BST and build a new one, starting from the oldest
      43              : //! LSN. See [`LayerMap::flush_updates()`].
      44              : //!
      45              : 
      46              : mod historic_layer_coverage;
      47              : mod layer_coverage;
      48              : 
      49              : use crate::context::RequestContext;
      50              : use crate::keyspace::KeyPartitioning;
      51              : use crate::tenant::storage_layer::InMemoryLayer;
      52              : use anyhow::Result;
      53              : use pageserver_api::key::Key;
      54              : use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
      55              : use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
      56              : use std::collections::{HashMap, VecDeque};
      57              : use std::iter::Peekable;
      58              : use std::ops::Range;
      59              : use std::sync::Arc;
      60              : use tokio::sync::watch;
      61              : use utils::lsn::Lsn;
      62              : 
      63              : use historic_layer_coverage::BufferedHistoricLayerCoverage;
      64              : pub use historic_layer_coverage::LayerKey;
      65              : 
      66              : use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
      67              : 
      68              : ///
      69              : /// LayerMap tracks what layers exist on a timeline.
      70              : ///
      71              : pub struct LayerMap {
      72              :     //
      73              :     // 'open_layer' holds the current InMemoryLayer that is accepting new
      74              :     // records. If it is None, 'next_open_layer_at' will be set instead, indicating
      75              :     // where the start LSN of the next InMemoryLayer that is to be created.
      76              :     //
      77              :     pub open_layer: Option<Arc<InMemoryLayer>>,
      78              :     pub next_open_layer_at: Option<Lsn>,
      79              : 
      80              :     ///
      81              :     /// Frozen layers, if any. Frozen layers are in-memory layers that
      82              :     /// are no longer added to, but haven't been written out to disk
      83              :     /// yet. They contain WAL older than the current 'open_layer' or
      84              :     /// 'next_open_layer_at', but newer than any historic layer.
      85              :     /// The frozen layers are in order from oldest to newest, so that
      86              :     /// the newest one is in the 'back' of the VecDeque, and the oldest
      87              :     /// in the 'front'.
      88              :     ///
      89              :     pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
      90              : 
      91              :     /// Index of the historic layers optimized for search
      92              :     historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
      93              : 
      94              :     /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
      95              :     /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
      96              :     ///
      97              :     /// NB: make sure to notify `watch_l0_deltas` on changes.
      98              :     l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
      99              : 
     100              :     /// Notifies about L0 delta layer changes, sending the current number of L0 layers.
     101              :     watch_l0_deltas: watch::Sender<usize>,
     102              : }
     103              : 
     104              : impl Default for LayerMap {
     105          912 :     fn default() -> Self {
     106          912 :         Self {
     107          912 :             open_layer: Default::default(),
     108          912 :             next_open_layer_at: Default::default(),
     109          912 :             frozen_layers: Default::default(),
     110          912 :             historic: Default::default(),
     111          912 :             l0_delta_layers: Default::default(),
     112          912 :             watch_l0_deltas: watch::channel(0).0,
     113          912 :         }
     114          912 :     }
     115              : }
     116              : 
     117              : /// The primary update API for the layer map.
     118              : ///
     119              : /// Batching historic layer insertions and removals is good for
     120              : /// performance and this struct helps us do that correctly.
     121              : #[must_use]
     122              : pub struct BatchedUpdates<'a> {
     123              :     // While we hold this exclusive reference to the layer map the type checker
     124              :     // will prevent us from accidentally reading any unflushed updates.
     125              :     layer_map: &'a mut LayerMap,
     126              : }
     127              : 
     128              : /// Provide ability to batch more updates while hiding the read
     129              : /// API so we don't accidentally read without flushing.
     130              : impl BatchedUpdates<'_> {
     131              :     ///
     132              :     /// Insert an on-disk layer.
     133              :     ///
     134              :     // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
     135         9792 :     pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
     136         9792 :         self.layer_map.insert_historic_noflush(layer_desc)
     137         9792 :     }
     138              : 
     139              :     ///
     140              :     /// Remove an on-disk layer from the map.
     141              :     ///
     142              :     /// This should be called when the corresponding file on disk has been deleted.
     143              :     ///
     144         1032 :     pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
     145         1032 :         self.layer_map.remove_historic_noflush(layer_desc)
     146         1032 :     }
     147              : 
     148              :     // We will flush on drop anyway, but this method makes it
     149              :     // more explicit that there is some work being done.
     150              :     /// Apply all updates
     151         3568 :     pub fn flush(self) {
     152         3568 :         // Flush happens on drop
     153         3568 :     }
     154              : }
     155              : 
     156              : // Ideally the flush() method should be called explicitly for more
     157              : // controlled execution. But if we forget we'd rather flush on drop
     158              : // than panic later or read without flushing.
     159              : //
     160              : // TODO maybe warn if flush hasn't explicitly been called
     161              : impl Drop for BatchedUpdates<'_> {
     162         3568 :     fn drop(&mut self) {
     163         3568 :         self.layer_map.flush_updates();
     164         3568 :     }
     165              : }
     166              : 
     167              : /// Return value of LayerMap::search
     168              : #[derive(Eq, PartialEq, Debug, Hash)]
     169              : pub struct SearchResult {
     170              :     pub layer: Arc<PersistentLayerDesc>,
     171              :     pub lsn_floor: Lsn,
     172              : }
     173              : 
     174              : /// Return value of [`LayerMap::range_search`]
     175              : ///
     176              : /// Contains a mapping from a layer description to a keyspace
     177              : /// accumulator that contains all the keys which intersect the layer
     178              : /// from the original search space. Keys that were not found are accumulated
     179              : /// in a separate key space accumulator.
     180              : #[derive(Debug)]
     181              : pub struct RangeSearchResult {
     182              :     pub found: HashMap<SearchResult, KeySpaceAccum>,
     183              :     pub not_found: KeySpaceAccum,
     184              : }
     185              : 
     186              : impl RangeSearchResult {
     187       951168 :     fn new() -> Self {
     188       951168 :         Self {
     189       951168 :             found: HashMap::new(),
     190       951168 :             not_found: KeySpaceAccum::new(),
     191       951168 :         }
     192       951168 :     }
     193              : }
     194              : 
     195              : /// Collector for results of range search queries on the LayerMap.
     196              : /// It should be provided with two iterators for the delta and image coverage
     197              : /// that contain all the changes for layers which intersect the range.
     198              : struct RangeSearchCollector<Iter>
     199              : where
     200              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     201              : {
     202              :     delta_coverage: Peekable<Iter>,
     203              :     image_coverage: Peekable<Iter>,
     204              :     key_range: Range<Key>,
     205              :     end_lsn: Lsn,
     206              : 
     207              :     current_delta: Option<Arc<PersistentLayerDesc>>,
     208              :     current_image: Option<Arc<PersistentLayerDesc>>,
     209              : 
     210              :     result: RangeSearchResult,
     211              : }
     212              : 
     213              : #[derive(Debug)]
     214              : enum NextLayerType {
     215              :     Delta(i128),
     216              :     Image(i128),
     217              :     Both(i128),
     218              : }
     219              : 
     220              : impl NextLayerType {
     221       986732 :     fn next_change_at_key(&self) -> Key {
     222       986732 :         match self {
     223       287434 :             NextLayerType::Delta(at) => Key::from_i128(*at),
     224        11844 :             NextLayerType::Image(at) => Key::from_i128(*at),
     225       687454 :             NextLayerType::Both(at) => Key::from_i128(*at),
     226              :         }
     227       986732 :     }
     228              : }
     229              : 
     230              : impl<Iter> RangeSearchCollector<Iter>
     231              : where
     232              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     233              : {
     234       490724 :     fn new(
     235       490724 :         key_range: Range<Key>,
     236       490724 :         end_lsn: Lsn,
     237       490724 :         delta_coverage: Iter,
     238       490724 :         image_coverage: Iter,
     239       490724 :     ) -> Self {
     240       490724 :         Self {
     241       490724 :             delta_coverage: delta_coverage.peekable(),
     242       490724 :             image_coverage: image_coverage.peekable(),
     243       490724 :             key_range,
     244       490724 :             end_lsn,
     245       490724 :             current_delta: None,
     246       490724 :             current_image: None,
     247       490724 :             result: RangeSearchResult::new(),
     248       490724 :         }
     249       490724 :     }
     250              : 
     251              :     /// Run the collector. Collection is implemented via a two pointer algorithm.
     252              :     /// One pointer tracks the start of the current range and the other tracks
     253              :     /// the beginning of the next range which will overlap with the next change
     254              :     /// in coverage across both image and delta.
     255       490724 :     fn collect(mut self) -> RangeSearchResult {
     256       490724 :         let next_layer_type = self.choose_next_layer_type();
     257       486212 :         let mut current_range_start = match next_layer_type {
     258              :             None => {
     259              :                 // No changes for the range
     260         4512 :                 self.pad_range(self.key_range.clone());
     261         4512 :                 return self.result;
     262              :             }
     263       486212 :             Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
     264            0 :                 // Changes only after the end of the range
     265            0 :                 self.pad_range(self.key_range.clone());
     266            0 :                 return self.result;
     267              :             }
     268       486212 :             Some(layer_type) => {
     269       486212 :                 // Changes for the range exist. Record anything before the first
     270       486212 :                 // coverage change as not found.
     271       486212 :                 let coverage_start = layer_type.next_change_at_key();
     272       486212 :                 let range_before = self.key_range.start..coverage_start;
     273       486212 :                 self.pad_range(range_before);
     274       486212 : 
     275       486212 :                 self.advance(&layer_type);
     276       486212 :                 coverage_start
     277              :             }
     278              :         };
     279              : 
     280       986732 :         while current_range_start < self.key_range.end {
     281       500520 :             let next_layer_type = self.choose_next_layer_type();
     282       500520 :             match next_layer_type {
     283        14308 :                 Some(t) => {
     284        14308 :                     let current_range_end = t.next_change_at_key();
     285        14308 :                     self.add_range(current_range_start..current_range_end);
     286        14308 :                     current_range_start = current_range_end;
     287        14308 : 
     288        14308 :                     self.advance(&t);
     289        14308 :                 }
     290       486212 :                 None => {
     291       486212 :                     self.add_range(current_range_start..self.key_range.end);
     292       486212 :                     current_range_start = self.key_range.end;
     293       486212 :                 }
     294              :             }
     295              :         }
     296              : 
     297       486212 :         self.result
     298       490724 :     }
     299              : 
     300              :     /// Mark a range as not found (i.e. no layers intersect it)
     301       492600 :     fn pad_range(&mut self, key_range: Range<Key>) {
     302       492600 :         if !key_range.is_empty() {
     303         8372 :             self.result.not_found.add_range(key_range);
     304       484228 :         }
     305       492600 :     }
     306              : 
     307              :     /// Select the appropiate layer for the given range and update
     308              :     /// the collector.
     309       500520 :     fn add_range(&mut self, covered_range: Range<Key>) {
     310       500520 :         let selected = LayerMap::select_layer(
     311       500520 :             self.current_delta.clone(),
     312       500520 :             self.current_image.clone(),
     313       500520 :             self.end_lsn,
     314       500520 :         );
     315       500520 : 
     316       500520 :         match selected {
     317       498644 :             Some(search_result) => self
     318       498644 :                 .result
     319       498644 :                 .found
     320       498644 :                 .entry(search_result)
     321       498644 :                 .or_default()
     322       498644 :                 .add_range(covered_range),
     323         1876 :             None => self.pad_range(covered_range),
     324              :         }
     325       500520 :     }
     326              : 
     327              :     /// Move to the next coverage change.
     328       500520 :     fn advance(&mut self, layer_type: &NextLayerType) {
     329       500520 :         match layer_type {
     330       145437 :             NextLayerType::Delta(_) => {
     331       145437 :                 let (_, layer) = self.delta_coverage.next().unwrap();
     332       145437 :                 self.current_delta = layer;
     333       145437 :             }
     334         6832 :             NextLayerType::Image(_) => {
     335         6832 :                 let (_, layer) = self.image_coverage.next().unwrap();
     336         6832 :                 self.current_image = layer;
     337         6832 :             }
     338       348251 :             NextLayerType::Both(_) => {
     339       348251 :                 let (_, image_layer) = self.image_coverage.next().unwrap();
     340       348251 :                 let (_, delta_layer) = self.delta_coverage.next().unwrap();
     341       348251 : 
     342       348251 :                 self.current_image = image_layer;
     343       348251 :                 self.current_delta = delta_layer;
     344       348251 :             }
     345              :         }
     346       500520 :     }
     347              : 
     348              :     /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
     349       991244 :     fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
     350       991244 :         let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
     351       991244 :         let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
     352       991244 : 
     353       991244 :         match (next_delta_at, next_image_at) {
     354       490724 :             (None, None) => None,
     355       140393 :             (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
     356         5952 :             (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
     357       354175 :             (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
     358          880 :                 Some(NextLayerType::Image(*next_image_at))
     359              :             }
     360       353295 :             (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
     361         5044 :                 Some(NextLayerType::Delta(*next_delta_at))
     362              :             }
     363       348251 :             (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
     364              :         }
     365       991244 :     }
     366              : }
     367              : 
     368              : impl LayerMap {
     369              :     ///
     370              :     /// Find the latest layer (by lsn.end) that covers the given
     371              :     /// 'key', with lsn.start < 'end_lsn'.
     372              :     ///
     373              :     /// The caller of this function is the page reconstruction
     374              :     /// algorithm looking for the next relevant delta layer, or
     375              :     /// the terminal image layer. The caller will pass the lsn_floor
     376              :     /// value as end_lsn in the next call to search.
     377              :     ///
     378              :     /// If there's an image layer exactly below the given end_lsn,
     379              :     /// search should return that layer regardless if there are
     380              :     /// overlapping deltas.
     381              :     ///
     382              :     /// If the latest layer is a delta and there is an overlapping
     383              :     /// image with it below, the lsn_floor returned should be right
     384              :     /// above that image so we don't skip it in the search. Otherwise
     385              :     /// the lsn_floor returned should be the bottom of the delta layer
     386              :     /// because we should make as much progress down the lsn axis
     387              :     /// as possible. It's fine if this way we skip some overlapping
     388              :     /// deltas, because the delta we returned would contain the same
     389              :     /// wal content.
     390              :     ///
     391              :     /// TODO: This API is convoluted and inefficient. If the caller
     392              :     /// makes N search calls, we'll end up finding the same latest
     393              :     /// image layer N times. We should either cache the latest image
     394              :     /// layer result, or simplify the api to `get_latest_image` and
     395              :     /// `get_latest_delta`, and only call `get_latest_image` once.
     396              :     ///
     397              :     /// NOTE: This only searches the 'historic' layers, *not* the
     398              :     /// 'open' and 'frozen' layers!
     399              :     ///
     400       143964 :     pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
     401       143964 :         let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
     402       143964 :         let latest_delta = version.delta_coverage.query(key.to_i128());
     403       143964 :         let latest_image = version.image_coverage.query(key.to_i128());
     404       143964 : 
     405       143964 :         Self::select_layer(latest_delta, latest_image, end_lsn)
     406       143964 :     }
     407              : 
     408       644484 :     fn select_layer(
     409       644484 :         delta_layer: Option<Arc<PersistentLayerDesc>>,
     410       644484 :         image_layer: Option<Arc<PersistentLayerDesc>>,
     411       644484 :         end_lsn: Lsn,
     412       644484 :     ) -> Option<SearchResult> {
     413       644484 :         assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta()));
     414       644484 :         assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta()));
     415              : 
     416       644484 :         match (delta_layer, image_layer) {
     417        23196 :             (None, None) => None,
     418        73640 :             (None, Some(image)) => {
     419        73640 :                 let lsn_floor = image.get_lsn_range().start;
     420        73640 :                 Some(SearchResult {
     421        73640 :                     layer: image,
     422        73640 :                     lsn_floor,
     423        73640 :                 })
     424              :             }
     425       154197 :             (Some(delta), None) => {
     426       154197 :                 let lsn_floor = delta.get_lsn_range().start;
     427       154197 :                 Some(SearchResult {
     428       154197 :                     layer: delta,
     429       154197 :                     lsn_floor,
     430       154197 :                 })
     431              :             }
     432       393451 :             (Some(delta), Some(image)) => {
     433       393451 :                 let img_lsn = image.get_lsn_range().start;
     434       393451 :                 let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
     435       393451 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     436       393451 :                 if image_is_newer || image_exact_match {
     437        61888 :                     Some(SearchResult {
     438        61888 :                         layer: image,
     439        61888 :                         lsn_floor: img_lsn,
     440        61888 :                     })
     441              :                 } else {
     442       331563 :                     let lsn_floor =
     443       331563 :                         std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
     444       331563 :                     Some(SearchResult {
     445       331563 :                         layer: delta,
     446       331563 :                         lsn_floor,
     447       331563 :                     })
     448              :                 }
     449              :             }
     450              :         }
     451       644484 :     }
     452              : 
     453       944088 :     pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
     454       944088 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     455       490724 :             Some(version) => version,
     456              :             None => {
     457       453364 :                 let mut result = RangeSearchResult::new();
     458       453364 :                 result.not_found.add_range(key_range);
     459       453364 :                 return result;
     460              :             }
     461              :         };
     462              : 
     463       490724 :         let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
     464       490724 :         let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
     465       490724 :         let image_changes = version.image_coverage.range_overlaps(&raw_range);
     466       490724 : 
     467       490724 :         let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
     468       490724 :         collector.collect()
     469       944088 :     }
     470              : 
     471              :     /// Start a batch of updates, applied on drop
     472         3568 :     pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
     473         3568 :         BatchedUpdates { layer_map: self }
     474         3568 :     }
     475              : 
     476              :     ///
     477              :     /// Insert an on-disk layer
     478              :     ///
     479              :     /// Helper function for BatchedUpdates::insert_historic
     480              :     ///
     481              :     /// TODO(chi): remove L generic so that we do not need to pass layer object.
     482         9812 :     pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
     483         9812 :         // TODO: See #3869, resulting #4088, attempted fix and repro #4094
     484         9812 : 
     485         9812 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     486         1996 :             self.l0_delta_layers.push(layer_desc.clone().into());
     487         1996 :             self.watch_l0_deltas
     488         1996 :                 .send_replace(self.l0_delta_layers.len());
     489         7816 :         }
     490              : 
     491         9812 :         self.historic.insert(
     492         9812 :             historic_layer_coverage::LayerKey::from(&layer_desc),
     493         9812 :             layer_desc.into(),
     494         9812 :         );
     495         9812 :     }
     496              : 
     497              :     ///
     498              :     /// Remove an on-disk layer from the map.
     499              :     ///
     500              :     /// Helper function for BatchedUpdates::remove_historic
     501              :     ///
     502         1032 :     pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
     503         1032 :         self.historic
     504         1032 :             .remove(historic_layer_coverage::LayerKey::from(layer_desc));
     505         1032 :         let layer_key = layer_desc.key();
     506         1032 :         if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
     507          808 :             let len_before = self.l0_delta_layers.len();
     508          808 :             let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
     509        11048 :             l0_delta_layers.retain(|other| other.key() != layer_key);
     510          808 :             self.l0_delta_layers = l0_delta_layers;
     511          808 :             self.watch_l0_deltas
     512          808 :                 .send_replace(self.l0_delta_layers.len());
     513          808 :             // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
     514          808 :             // there's a chance that the comparison fails at runtime due to it comparing (pointer,
     515          808 :             // vtable) pairs.
     516          808 :             assert_eq!(
     517          808 :                 self.l0_delta_layers.len(),
     518          808 :                 len_before - 1,
     519            0 :                 "failed to locate removed historic layer from l0_delta_layers"
     520              :             );
     521          224 :         }
     522         1032 :     }
     523              : 
     524              :     /// Helper function for BatchedUpdates::drop.
     525         3572 :     pub(self) fn flush_updates(&mut self) {
     526         3572 :         self.historic.rebuild();
     527         3572 :     }
     528              : 
     529              :     /// Is there a newer image layer for given key- and LSN-range? Or a set
     530              :     /// of image layers within the specified lsn range that cover the entire
     531              :     /// specified key range?
     532              :     ///
     533              :     /// This is used for garbage collection, to determine if an old layer can
     534              :     /// be deleted.
     535           16 :     pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
     536           16 :         if key.is_empty() {
     537              :             // Vacuously true. There's a newer image for all 0 of the kerys in the range.
     538            0 :             return true;
     539           16 :         }
     540              : 
     541           16 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     542           16 :             Some(v) => v,
     543            0 :             None => return false,
     544              :         };
     545              : 
     546           16 :         let start = key.start.to_i128();
     547           16 :         let end = key.end.to_i128();
     548           16 : 
     549           20 :         let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
     550           20 :             Some(layer) => layer.get_lsn_range().start >= lsn.start,
     551            0 :             None => false,
     552           20 :         };
     553              : 
     554              :         // Check the start is covered
     555           16 :         if !layer_covers(version.image_coverage.query(start)) {
     556           12 :             return false;
     557            4 :         }
     558              : 
     559              :         // Check after all changes of coverage
     560            4 :         for (_, change_val) in version.image_coverage.range(start..end) {
     561            4 :             if !layer_covers(change_val) {
     562            0 :                 return false;
     563            4 :             }
     564              :         }
     565              : 
     566            4 :         true
     567           16 :     }
     568              : 
     569         7324 :     pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
     570         7324 :         self.historic.iter()
     571         7324 :     }
     572              : 
     573              :     /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
     574      2149811 :     pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<Arc<InMemoryLayer>>
     575      2149811 :     where
     576      2149811 :         Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
     577      2149811 :     {
     578      2149811 :         if let Some(open) = &self.open_layer {
     579      1823122 :             if pred(open) {
     580      1208471 :                 return Some(open.clone());
     581       614651 :             }
     582       326689 :         }
     583              : 
     584       941340 :         self.frozen_layers.iter().rfind(|l| pred(l)).cloned()
     585      2149811 :     }
     586              : 
     587              :     ///
     588              :     /// Divide the whole given range of keys into sub-ranges based on the latest
     589              :     /// image layer that covers each range at the specified lsn (inclusive).
     590              :     /// This is used when creating  new image layers.
     591           28 :     pub fn image_coverage(
     592           28 :         &self,
     593           28 :         key_range: &Range<Key>,
     594           28 :         lsn: Lsn,
     595           28 :     ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
     596           28 :         let version = match self.historic.get().unwrap().get_version(lsn.0) {
     597           28 :             Some(v) => v,
     598            0 :             None => return vec![],
     599              :         };
     600              : 
     601           28 :         let start = key_range.start.to_i128();
     602           28 :         let end = key_range.end.to_i128();
     603           28 : 
     604           28 :         // Initialize loop variables
     605           28 :         let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
     606           28 :         let mut current_key = start;
     607           28 :         let mut current_val = version.image_coverage.query(start);
     608              : 
     609              :         // Loop through the change events and push intervals
     610           28 :         for (change_key, change_val) in version.image_coverage.range(start..end) {
     611            0 :             let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     612            0 :             coverage.push((kr, current_val.take()));
     613            0 :             current_key = change_key;
     614            0 :             current_val.clone_from(&change_val);
     615            0 :         }
     616              : 
     617              :         // Add the final interval
     618           28 :         let kr = Key::from_i128(current_key)..Key::from_i128(end);
     619           28 :         coverage.push((kr, current_val.take()));
     620           28 : 
     621           28 :         coverage
     622           28 :     }
     623              : 
     624              :     /// Check if the key range resembles that of an L0 layer.
     625        17094 :     pub fn is_l0(key_range: &Range<Key>, is_delta_layer: bool) -> bool {
     626        17094 :         is_delta_layer && key_range == &(Key::MIN..Key::MAX)
     627        17094 :     }
     628              : 
     629              :     /// This function determines which layers are counted in `count_deltas`:
     630              :     /// layers that should count towards deciding whether or not to reimage
     631              :     /// a certain partition range.
     632              :     ///
     633              :     /// There are two kinds of layers we currently consider reimage-worthy:
     634              :     ///
     635              :     /// Case 1: Non-L0 layers are currently reimage-worthy by default.
     636              :     /// TODO Some of these layers are very sparse and cover the entire key
     637              :     ///      range. Replacing 256MB of data (or less!) with terabytes of
     638              :     ///      images doesn't seem wise. We need a better heuristic, possibly
     639              :     ///      based on some of these factors:
     640              :     ///      a) whether this layer has any wal in this partition range
     641              :     ///      b) the size of the layer
     642              :     ///      c) the number of images needed to cover it
     643              :     ///      d) the estimated time until we'll have to reimage over it for GC
     644              :     ///
     645              :     /// Case 2: Since L0 layers by definition cover the entire key space, we consider
     646              :     /// them reimage-worthy only when the entire key space can be covered by very few
     647              :     /// images (currently 1).
     648              :     /// TODO The optimal number should probably be slightly higher than 1, but to
     649              :     ///      implement that we need to plumb a lot more context into this function
     650              :     ///      than just the current partition_range.
     651            0 :     pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
     652            0 :         // Case 1
     653            0 :         if !Self::is_l0(&layer.key_range, layer.is_delta) {
     654            0 :             return true;
     655            0 :         }
     656            0 : 
     657            0 :         // Case 2
     658            0 :         if partition_range == &(Key::MIN..Key::MAX) {
     659            0 :             return true;
     660            0 :         }
     661            0 : 
     662            0 :         false
     663            0 :     }
     664              : 
     665              :     /// Count the height of the tallest stack of reimage-worthy deltas
     666              :     /// in this 2d region.
     667              :     ///
     668              :     /// If `limit` is provided we don't try to count above that number.
     669              :     ///
     670              :     /// This number is used to compute the largest number of deltas that
     671              :     /// we'll need to visit for any page reconstruction in this region.
     672              :     /// We use this heuristic to decide whether to create an image layer.
     673           28 :     pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
     674           28 :         // We get the delta coverage of the region, and for each part of the coverage
     675           28 :         // we recurse right underneath the delta. The recursion depth is limited by
     676           28 :         // the largest result this function could return, which is in practice between
     677           28 :         // 3 and 10 (since we usually try to create an image when the number gets larger).
     678           28 : 
     679           28 :         if lsn.is_empty() || key.is_empty() || limit == Some(0) {
     680            0 :             return 0;
     681           28 :         }
     682              : 
     683           28 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     684           28 :             Some(v) => v,
     685            0 :             None => return 0,
     686              :         };
     687              : 
     688           28 :         let start = key.start.to_i128();
     689           28 :         let end = key.end.to_i128();
     690           28 : 
     691           28 :         // Initialize loop variables
     692           28 :         let mut max_stacked_deltas = 0;
     693           28 :         let mut current_key = start;
     694           28 :         let mut current_val = version.delta_coverage.query(start);
     695              : 
     696              :         // Loop through the delta coverage and recurse on each part
     697           28 :         for (change_key, change_val) in version.delta_coverage.range(start..end) {
     698              :             // If there's a relevant delta in this part, add 1 and recurse down
     699            0 :             if let Some(val) = &current_val {
     700            0 :                 if val.get_lsn_range().end > lsn.start {
     701            0 :                     let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     702            0 :                     let lr = lsn.start..val.get_lsn_range().start;
     703            0 :                     if !kr.is_empty() {
     704            0 :                         let base_count = Self::is_reimage_worthy(val, key) as usize;
     705            0 :                         let new_limit = limit.map(|l| l - base_count);
     706            0 :                         let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     707            0 :                         max_stacked_deltas = std::cmp::max(
     708            0 :                             max_stacked_deltas,
     709            0 :                             base_count + max_stacked_deltas_underneath,
     710            0 :                         );
     711            0 :                     }
     712            0 :                 }
     713            0 :             }
     714              : 
     715            0 :             current_key = change_key;
     716            0 :             current_val.clone_from(&change_val);
     717              :         }
     718              : 
     719              :         // Consider the last part
     720           28 :         if let Some(val) = &current_val {
     721            0 :             if val.get_lsn_range().end > lsn.start {
     722            0 :                 let kr = Key::from_i128(current_key)..Key::from_i128(end);
     723            0 :                 let lr = lsn.start..val.get_lsn_range().start;
     724            0 : 
     725            0 :                 if !kr.is_empty() {
     726            0 :                     let base_count = Self::is_reimage_worthy(val, key) as usize;
     727            0 :                     let new_limit = limit.map(|l| l - base_count);
     728            0 :                     let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     729            0 :                     max_stacked_deltas = std::cmp::max(
     730            0 :                         max_stacked_deltas,
     731            0 :                         base_count + max_stacked_deltas_underneath,
     732            0 :                     );
     733            0 :                 }
     734            0 :             }
     735           28 :         }
     736              : 
     737           28 :         max_stacked_deltas
     738           28 :     }
     739              : 
     740              :     /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
     741              :     ///
     742              :     /// The `partition_range` argument is used as context for the reimage-worthiness decision.
     743              :     ///
     744              :     /// Used as a helper for correctness checks only. Performance not critical.
     745            0 :     pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
     746            0 :         match self.search(key, lsn) {
     747            0 :             Some(search_result) => {
     748            0 :                 if search_result.layer.is_incremental() {
     749            0 :                     (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
     750            0 :                         + self.get_difficulty(search_result.lsn_floor, key, partition_range)
     751              :                 } else {
     752            0 :                     0
     753              :                 }
     754              :             }
     755            0 :             None => 0,
     756              :         }
     757            0 :     }
     758              : 
     759              :     /// Used for correctness checking. Results are expected to be identical to
     760              :     /// self.get_difficulty_map. Assumes self.search is correct.
     761            0 :     pub fn get_difficulty_map_bruteforce(
     762            0 :         &self,
     763            0 :         lsn: Lsn,
     764            0 :         partitioning: &KeyPartitioning,
     765            0 :     ) -> Vec<usize> {
     766            0 :         // Looking at the difficulty as a function of key, it could only increase
     767            0 :         // when a delta layer starts or an image layer ends. Therefore it's sufficient
     768            0 :         // to check the difficulties at:
     769            0 :         // - the key.start for each non-empty part range
     770            0 :         // - the key.start for each delta
     771            0 :         // - the key.end for each image
     772            0 :         let keys_iter: Box<dyn Iterator<Item = Key>> = {
     773            0 :             let mut keys: Vec<Key> = self
     774            0 :                 .iter_historic_layers()
     775            0 :                 .map(|layer| {
     776            0 :                     if layer.is_incremental() {
     777            0 :                         layer.get_key_range().start
     778              :                     } else {
     779            0 :                         layer.get_key_range().end
     780              :                     }
     781            0 :                 })
     782            0 :                 .collect();
     783            0 :             keys.sort();
     784            0 :             Box::new(keys.into_iter())
     785            0 :         };
     786            0 :         let mut keys_iter = keys_iter.peekable();
     787            0 : 
     788            0 :         // Iter the partition and keys together and query all the necessary
     789            0 :         // keys, computing the max difficulty for each part.
     790            0 :         partitioning
     791            0 :             .parts
     792            0 :             .iter()
     793            0 :             .map(|part| {
     794            0 :                 let mut difficulty = 0;
     795              :                 // Partition ranges are assumed to be sorted and disjoint
     796              :                 // TODO assert it
     797            0 :                 for range in &part.ranges {
     798            0 :                     if !range.is_empty() {
     799            0 :                         difficulty =
     800            0 :                             std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
     801            0 :                     }
     802            0 :                     while let Some(key) = keys_iter.peek() {
     803            0 :                         if key >= &range.end {
     804            0 :                             break;
     805            0 :                         }
     806            0 :                         let key = keys_iter.next().unwrap();
     807            0 :                         if key < range.start {
     808            0 :                             continue;
     809            0 :                         }
     810            0 :                         difficulty =
     811            0 :                             std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
     812              :                     }
     813              :                 }
     814            0 :                 difficulty
     815            0 :             })
     816            0 :             .collect()
     817            0 :     }
     818              : 
     819              :     /// For each part of a keyspace partitioning, return the maximum number of layers
     820              :     /// that would be needed for page reconstruction in that part at the given LSN.
     821              :     ///
     822              :     /// If `limit` is provided we don't try to count above that number.
     823              :     ///
     824              :     /// This method is used to decide where to create new image layers. Computing the
     825              :     /// result for the entire partitioning at once allows this function to be more
     826              :     /// efficient, and further optimization is possible by using iterators instead,
     827              :     /// to allow early return.
     828              :     ///
     829              :     /// TODO actually use this method instead of count_deltas. Currently we only use
     830              :     ///      it for benchmarks.
     831            0 :     pub fn get_difficulty_map(
     832            0 :         &self,
     833            0 :         lsn: Lsn,
     834            0 :         partitioning: &KeyPartitioning,
     835            0 :         limit: Option<usize>,
     836            0 :     ) -> Vec<usize> {
     837            0 :         // TODO This is a naive implementation. Perf improvements to do:
     838            0 :         // 1. Instead of calling self.image_coverage and self.count_deltas,
     839            0 :         //    iterate the image and delta coverage only once.
     840            0 :         partitioning
     841            0 :             .parts
     842            0 :             .iter()
     843            0 :             .map(|part| {
     844            0 :                 let mut difficulty = 0;
     845            0 :                 for range in &part.ranges {
     846            0 :                     if limit == Some(difficulty) {
     847            0 :                         break;
     848            0 :                     }
     849            0 :                     for (img_range, last_img) in self.image_coverage(range, lsn) {
     850            0 :                         if limit == Some(difficulty) {
     851            0 :                             break;
     852            0 :                         }
     853            0 :                         let img_lsn = if let Some(last_img) = last_img {
     854            0 :                             last_img.get_lsn_range().end
     855              :                         } else {
     856            0 :                             Lsn(0)
     857              :                         };
     858              : 
     859            0 :                         if img_lsn < lsn {
     860            0 :                             let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
     861            0 :                             difficulty = std::cmp::max(difficulty, num_deltas);
     862            0 :                         }
     863              :                     }
     864              :                 }
     865            0 :                 difficulty
     866            0 :             })
     867            0 :             .collect()
     868            0 :     }
     869              : 
     870              :     /// Return all L0 delta layers
     871         3304 :     pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
     872         3304 :         &self.l0_delta_layers
     873         3304 :     }
     874              : 
     875              :     /// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers.
     876          884 :     pub fn watch_level0_deltas(&self) -> watch::Receiver<usize> {
     877          884 :         self.watch_l0_deltas.subscribe()
     878          884 :     }
     879              : 
     880              :     /// debugging function to print out the contents of the layer map
     881              :     #[allow(unused)]
     882            4 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     883            4 :         println!("Begin dump LayerMap");
     884            4 : 
     885            4 :         println!("open_layer:");
     886            4 :         if let Some(open_layer) = &self.open_layer {
     887            0 :             open_layer.dump(verbose, ctx).await?;
     888            4 :         }
     889              : 
     890            4 :         println!("frozen_layers:");
     891            4 :         for frozen_layer in self.frozen_layers.iter() {
     892            0 :             frozen_layer.dump(verbose, ctx).await?;
     893              :         }
     894              : 
     895            4 :         println!("historic_layers:");
     896           24 :         for desc in self.iter_historic_layers() {
     897           24 :             desc.dump();
     898           24 :         }
     899            4 :         println!("End dump LayerMap");
     900            4 :         Ok(())
     901            4 :     }
     902              : 
     903              :     /// `read_points` represent the tip of a timeline and any branch points, i.e. the places
     904              :     /// where we expect to serve reads.
     905              :     ///
     906              :     /// This function is O(N) and should be called infrequently.  The caller is responsible for
     907              :     /// looking up and updating the Layer objects for these layer descriptors.
     908          468 :     pub fn get_visibility(
     909          468 :         &self,
     910          468 :         mut read_points: Vec<Lsn>,
     911          468 :     ) -> (
     912          468 :         Vec<(Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
     913          468 :         KeySpace,
     914          468 :     ) {
     915              :         // This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
     916              :         // KeySpace is intended to be composed statically and iterated over.
     917              :         struct KeyShadow {
     918              :             // Map of range start to range end
     919              :             inner: RangeSetBlaze<i128>,
     920              :         }
     921              : 
     922              :         impl KeyShadow {
     923          468 :             fn new() -> Self {
     924          468 :                 Self {
     925          468 :                     inner: Default::default(),
     926          468 :                 }
     927          468 :             }
     928              : 
     929         3204 :             fn contains(&self, range: Range<Key>) -> bool {
     930         3204 :                 let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
     931         3204 :                 self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
     932         3204 :                     CheckSortedDisjoint::from([range_incl]),
     933         3204 :                 ))
     934         3204 :             }
     935              : 
     936              :             /// Add the input range to the keys covered by self.
     937              :             ///
     938              :             /// Return true if inserting this range covered some keys that were previously not covered
     939         3812 :             fn cover(&mut self, insert: Range<Key>) -> bool {
     940         3812 :                 let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
     941         3812 :                 self.inner.ranges_insert(range_incl)
     942         3812 :             }
     943              : 
     944          476 :             fn reset(&mut self) {
     945          476 :                 self.inner = Default::default();
     946          476 :             }
     947              : 
     948          468 :             fn to_keyspace(&self) -> KeySpace {
     949          468 :                 let mut accum = KeySpaceAccum::new();
     950          476 :                 for range_incl in self.inner.ranges() {
     951          476 :                     let range = Range {
     952          476 :                         start: Key::from_i128(*range_incl.start()),
     953          476 :                         end: Key::from_i128(range_incl.end() + 1),
     954          476 :                     };
     955          476 :                     accum.add_range(range)
     956              :                 }
     957              : 
     958          468 :                 accum.to_keyspace()
     959          468 :             }
     960              :         }
     961              : 
     962              :         // The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
     963              :         // and a ReadPoint
     964          468 :         read_points.sort_by_key(|rp| rp.0);
     965          468 :         let mut shadow = KeyShadow::new();
     966              : 
     967              :         // We will interleave all our read points and layers into a sorted collection
     968              :         enum Item {
     969              :             ReadPoint { lsn: Lsn },
     970              :             Layer(Arc<PersistentLayerDesc>),
     971              :         }
     972              : 
     973          468 :         let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
     974          468 :         items.extend(self.iter_historic_layers().map(Item::Layer));
     975          468 :         items.extend(
     976          468 :             read_points
     977          468 :                 .into_iter()
     978          476 :                 .map(|rp| Item::ReadPoint { lsn: rp }),
     979          468 :         );
     980          468 : 
     981          468 :         // Ordering: we want to iterate like this:
     982          468 :         // 1. Highest LSNs first
     983          468 :         // 2. Consider images before deltas if they end at the same LSNs (images cover deltas)
     984          468 :         // 3. Consider ReadPoints before image layers if they're at the same LSN (readpoints make that image visible)
     985       127160 :         items.sort_by_key(|item| {
     986       127160 :             std::cmp::Reverse(match item {
     987       126272 :                 Item::Layer(layer) => {
     988       126272 :                     if layer.is_delta() {
     989        59256 :                         (Lsn(layer.get_lsn_range().end.0 - 1), 0)
     990              :                     } else {
     991        67016 :                         (layer.image_layer_lsn(), 1)
     992              :                     }
     993              :                 }
     994          888 :                 Item::ReadPoint { lsn } => (*lsn, 2),
     995              :             })
     996       127160 :         });
     997          468 : 
     998          468 :         let mut results = Vec::with_capacity(self.historic.len());
     999          468 : 
    1000          468 :         let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
    1001              : 
    1002         7960 :         for item in items {
    1003         7492 :             let (reached_lsn, is_readpoint) = match &item {
    1004          476 :                 Item::ReadPoint { lsn } => (lsn, true),
    1005         7016 :                 Item::Layer(layer) => (&layer.lsn_range.start, false),
    1006              :             };
    1007         7492 :             maybe_covered_deltas.retain(|d| {
    1008          212 :                 if *reached_lsn >= d.lsn_range.start && is_readpoint {
    1009              :                     // We encountered a readpoint within the delta layer: it is visible
    1010              : 
    1011            4 :                     results.push((d.clone(), LayerVisibilityHint::Visible));
    1012            4 :                     false
    1013          208 :                 } else if *reached_lsn < d.lsn_range.start {
    1014              :                     // We passed the layer's range without encountering a read point: it is not visible
    1015           64 :                     results.push((d.clone(), LayerVisibilityHint::Covered));
    1016           64 :                     false
    1017              :                 } else {
    1018              :                     // We're still in the delta layer: continue iterating
    1019          144 :                     true
    1020              :                 }
    1021         7492 :             });
    1022         7492 : 
    1023         7492 :             match item {
    1024          476 :                 Item::ReadPoint { lsn: _lsn } => {
    1025          476 :                     // TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
    1026          476 :                     // to assume that the whole key range is visible at the branch point.
    1027          476 :                     shadow.reset();
    1028          476 :                 }
    1029         7016 :                 Item::Layer(layer) => {
    1030         7016 :                     let visibility = if layer.is_delta() {
    1031         3204 :                         if shadow.contains(layer.get_key_range()) {
    1032              :                             // If a layer isn't visible based on current state, we must defer deciding whether
    1033              :                             // it is truly not visible until we have advanced past the delta's range: we might
    1034              :                             // encounter another branch point within this delta layer's LSN range.
    1035           76 :                             maybe_covered_deltas.push(layer);
    1036           76 :                             continue;
    1037              :                         } else {
    1038         3128 :                             LayerVisibilityHint::Visible
    1039              :                         }
    1040              :                     } else {
    1041         3812 :                         let modified = shadow.cover(layer.get_key_range());
    1042         3812 :                         if modified {
    1043              :                             // An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
    1044         3740 :                             LayerVisibilityHint::Visible
    1045              :                         } else {
    1046              :                             // An image layer in a region that was already covered
    1047           72 :                             LayerVisibilityHint::Covered
    1048              :                         }
    1049              :                     };
    1050              : 
    1051         6940 :                     results.push((layer, visibility));
    1052              :                 }
    1053              :             }
    1054              :         }
    1055              : 
    1056              :         // Drain any remaining maybe_covered deltas
    1057          468 :         results.extend(
    1058          468 :             maybe_covered_deltas
    1059          468 :                 .into_iter()
    1060          468 :                 .map(|d| (d, LayerVisibilityHint::Covered)),
    1061          468 :         );
    1062          468 : 
    1063          468 :         (results, shadow.to_keyspace())
    1064          468 :     }
    1065              : }
    1066              : 
    1067              : #[cfg(test)]
    1068              : mod tests {
    1069              :     use crate::tenant::{storage_layer::LayerName, IndexPart};
    1070              :     use pageserver_api::{
    1071              :         key::DBDIR_KEY,
    1072              :         keyspace::{KeySpace, KeySpaceRandomAccum},
    1073              :     };
    1074              :     use std::{collections::HashMap, path::PathBuf};
    1075              :     use utils::{
    1076              :         id::{TenantId, TimelineId},
    1077              :         shard::TenantShardId,
    1078              :     };
    1079              : 
    1080              :     use super::*;
    1081              : 
    1082              :     #[derive(Clone)]
    1083              :     struct LayerDesc {
    1084              :         key_range: Range<Key>,
    1085              :         lsn_range: Range<Lsn>,
    1086              :         is_delta: bool,
    1087              :     }
    1088              : 
    1089            4 :     fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
    1090            4 :         let mut layer_map = LayerMap::default();
    1091              : 
    1092           24 :         for layer in layers {
    1093           20 :             layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
    1094           20 :                 layer.key_range,
    1095           20 :                 layer.lsn_range,
    1096           20 :                 layer.is_delta,
    1097           20 :             ));
    1098           20 :         }
    1099              : 
    1100            4 :         layer_map.flush_updates();
    1101            4 :         layer_map
    1102            4 :     }
    1103              : 
    1104         7080 :     fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
    1105         7080 :         assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
    1106         7080 :         let lhs: HashMap<SearchResult, KeySpace> = lhs
    1107         7080 :             .found
    1108         7080 :             .into_iter()
    1109        16460 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1110         7080 :             .collect();
    1111         7080 :         let rhs: HashMap<SearchResult, KeySpace> = rhs
    1112         7080 :             .found
    1113         7080 :             .into_iter()
    1114        16460 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
    1115         7080 :             .collect();
    1116         7080 : 
    1117         7080 :         assert_eq!(lhs, rhs);
    1118         7080 :     }
    1119              : 
    1120              :     #[cfg(test)]
    1121         7080 :     fn brute_force_range_search(
    1122         7080 :         layer_map: &LayerMap,
    1123         7080 :         key_range: Range<Key>,
    1124         7080 :         end_lsn: Lsn,
    1125         7080 :     ) -> RangeSearchResult {
    1126         7080 :         let mut range_search_result = RangeSearchResult::new();
    1127         7080 : 
    1128         7080 :         let mut key = key_range.start;
    1129       151040 :         while key != key_range.end {
    1130       143960 :             let res = layer_map.search(key, end_lsn);
    1131       143960 :             match res {
    1132       122640 :                 Some(res) => {
    1133       122640 :                     range_search_result
    1134       122640 :                         .found
    1135       122640 :                         .entry(res)
    1136       122640 :                         .or_default()
    1137       122640 :                         .add_key(key);
    1138       122640 :                 }
    1139        21320 :                 None => {
    1140        21320 :                     range_search_result.not_found.add_key(key);
    1141        21320 :                 }
    1142              :             }
    1143              : 
    1144       143960 :             key = key.next();
    1145              :         }
    1146              : 
    1147         7080 :         range_search_result
    1148         7080 :     }
    1149              : 
    1150              :     #[test]
    1151            4 :     fn ranged_search_on_empty_layer_map() {
    1152            4 :         let layer_map = LayerMap::default();
    1153            4 :         let range = Key::from_i128(100)..Key::from_i128(200);
    1154            4 : 
    1155            4 :         let res = layer_map.range_search(range.clone(), Lsn(100));
    1156            4 :         assert_eq!(
    1157            4 :             res.not_found.to_keyspace(),
    1158            4 :             KeySpace {
    1159            4 :                 ranges: vec![range]
    1160            4 :             }
    1161            4 :         );
    1162            4 :     }
    1163              : 
    1164              :     #[test]
    1165            4 :     fn ranged_search() {
    1166            4 :         let layers = vec![
    1167            4 :             LayerDesc {
    1168            4 :                 key_range: Key::from_i128(15)..Key::from_i128(50),
    1169            4 :                 lsn_range: Lsn(0)..Lsn(5),
    1170            4 :                 is_delta: false,
    1171            4 :             },
    1172            4 :             LayerDesc {
    1173            4 :                 key_range: Key::from_i128(10)..Key::from_i128(20),
    1174            4 :                 lsn_range: Lsn(5)..Lsn(20),
    1175            4 :                 is_delta: true,
    1176            4 :             },
    1177            4 :             LayerDesc {
    1178            4 :                 key_range: Key::from_i128(15)..Key::from_i128(25),
    1179            4 :                 lsn_range: Lsn(20)..Lsn(30),
    1180            4 :                 is_delta: true,
    1181            4 :             },
    1182            4 :             LayerDesc {
    1183            4 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1184            4 :                 lsn_range: Lsn(25)..Lsn(35),
    1185            4 :                 is_delta: true,
    1186            4 :             },
    1187            4 :             LayerDesc {
    1188            4 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
    1189            4 :                 lsn_range: Lsn(35)..Lsn(40),
    1190            4 :                 is_delta: false,
    1191            4 :             },
    1192            4 :         ];
    1193            4 : 
    1194            4 :         let layer_map = create_layer_map(layers.clone());
    1195          244 :         for start in 0..60 {
    1196         7080 :             for end in (start + 1)..60 {
    1197         7080 :                 let range = Key::from_i128(start)..Key::from_i128(end);
    1198         7080 :                 let result = layer_map.range_search(range.clone(), Lsn(100));
    1199         7080 :                 let expected = brute_force_range_search(&layer_map, range, Lsn(100));
    1200         7080 : 
    1201         7080 :                 assert_range_search_result_eq(result, expected);
    1202         7080 :             }
    1203              :         }
    1204            4 :     }
    1205              : 
    1206              :     #[test]
    1207            4 :     fn layer_visibility_basic() {
    1208            4 :         // A simple synthetic input, as a smoke test.
    1209            4 :         let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
    1210            4 :         let timeline_id = TimelineId::generate();
    1211            4 :         let mut layer_map = LayerMap::default();
    1212            4 :         let mut updates = layer_map.batch_update();
    1213              : 
    1214              :         const FAKE_LAYER_SIZE: u64 = 1024;
    1215              : 
    1216            4 :         let inject_delta = |updates: &mut BatchedUpdates,
    1217              :                             key_start: i128,
    1218              :                             key_end: i128,
    1219              :                             lsn_start: u64,
    1220           28 :                             lsn_end: u64| {
    1221           28 :             let desc = PersistentLayerDesc::new_delta(
    1222           28 :                 tenant_shard_id,
    1223           28 :                 timeline_id,
    1224           28 :                 Range {
    1225           28 :                     start: Key::from_i128(key_start),
    1226           28 :                     end: Key::from_i128(key_end),
    1227           28 :                 },
    1228           28 :                 Range {
    1229           28 :                     start: Lsn(lsn_start),
    1230           28 :                     end: Lsn(lsn_end),
    1231           28 :                 },
    1232           28 :                 1024,
    1233           28 :             );
    1234           28 :             updates.insert_historic(desc.clone());
    1235           28 :             desc
    1236           28 :         };
    1237              : 
    1238            4 :         let inject_image =
    1239           24 :             |updates: &mut BatchedUpdates, key_start: i128, key_end: i128, lsn: u64| {
    1240           24 :                 let desc = PersistentLayerDesc::new_img(
    1241           24 :                     tenant_shard_id,
    1242           24 :                     timeline_id,
    1243           24 :                     Range {
    1244           24 :                         start: Key::from_i128(key_start),
    1245           24 :                         end: Key::from_i128(key_end),
    1246           24 :                     },
    1247           24 :                     Lsn(lsn),
    1248           24 :                     FAKE_LAYER_SIZE,
    1249           24 :                 );
    1250           24 :                 updates.insert_historic(desc.clone());
    1251           24 :                 desc
    1252           24 :             };
    1253              : 
    1254              :         //
    1255              :         // Construct our scenario: the following lines go in backward-LSN order, constructing the various scenarios
    1256              :         // we expect to handle.  You can follow these examples through in the same order as they would be processed
    1257              :         // by the function under test.
    1258              :         //
    1259              : 
    1260            4 :         let mut read_points = vec![Lsn(1000)];
    1261            4 : 
    1262            4 :         // A delta ahead of any image layer
    1263            4 :         let ahead_layer = inject_delta(&mut updates, 10, 20, 101, 110);
    1264            4 : 
    1265            4 :         // An image layer is visible and covers some layers beneath itself
    1266            4 :         let visible_covering_img = inject_image(&mut updates, 5, 25, 99);
    1267            4 : 
    1268            4 :         // A delta layer covered by the image layer: should be covered
    1269            4 :         let covered_delta = inject_delta(&mut updates, 10, 20, 90, 100);
    1270            4 : 
    1271            4 :         // A delta layer partially covered by an image layer: should be visible
    1272            4 :         let partially_covered_delta = inject_delta(&mut updates, 1, 7, 90, 100);
    1273            4 : 
    1274            4 :         // A delta layer not covered by an image layer: should be visible
    1275            4 :         let not_covered_delta = inject_delta(&mut updates, 1, 4, 90, 100);
    1276            4 : 
    1277            4 :         // An image layer covered by the image layer above: should be covered
    1278            4 :         let covered_image = inject_image(&mut updates, 10, 20, 89);
    1279            4 : 
    1280            4 :         // An image layer partially covered by an image layer: should be visible
    1281            4 :         let partially_covered_image = inject_image(&mut updates, 1, 7, 89);
    1282            4 : 
    1283            4 :         // An image layer not covered by an image layer: should be visible
    1284            4 :         let not_covered_image = inject_image(&mut updates, 1, 4, 89);
    1285            4 : 
    1286            4 :         // A read point: this will make subsequent layers below here visible, even if there are
    1287            4 :         // more recent layers covering them.
    1288            4 :         read_points.push(Lsn(80));
    1289            4 : 
    1290            4 :         // A delta layer covered by an earlier image layer, but visible to a readpoint below that covering layer
    1291            4 :         let covered_delta_below_read_point = inject_delta(&mut updates, 10, 20, 70, 79);
    1292            4 : 
    1293            4 :         // A delta layer whose end LSN is covered, but where a read point is present partway through its LSN range:
    1294            4 :         // the read point should make it visible, even though its end LSN is covered
    1295            4 :         let covering_img_between_read_points = inject_image(&mut updates, 10, 20, 69);
    1296            4 :         let covered_delta_between_read_points = inject_delta(&mut updates, 10, 15, 67, 69);
    1297            4 :         read_points.push(Lsn(65));
    1298            4 :         let covered_delta_intersects_read_point = inject_delta(&mut updates, 15, 20, 60, 69);
    1299            4 : 
    1300            4 :         let visible_img_after_last_read_point = inject_image(&mut updates, 10, 20, 65);
    1301            4 : 
    1302            4 :         updates.flush();
    1303            4 : 
    1304            4 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1305            4 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1306            4 : 
    1307            4 :         assert_eq!(
    1308            4 :             layer_visibilities.get(&ahead_layer),
    1309            4 :             Some(&LayerVisibilityHint::Visible)
    1310            4 :         );
    1311            4 :         assert_eq!(
    1312            4 :             layer_visibilities.get(&visible_covering_img),
    1313            4 :             Some(&LayerVisibilityHint::Visible)
    1314            4 :         );
    1315            4 :         assert_eq!(
    1316            4 :             layer_visibilities.get(&covered_delta),
    1317            4 :             Some(&LayerVisibilityHint::Covered)
    1318            4 :         );
    1319            4 :         assert_eq!(
    1320            4 :             layer_visibilities.get(&partially_covered_delta),
    1321            4 :             Some(&LayerVisibilityHint::Visible)
    1322            4 :         );
    1323            4 :         assert_eq!(
    1324            4 :             layer_visibilities.get(&not_covered_delta),
    1325            4 :             Some(&LayerVisibilityHint::Visible)
    1326            4 :         );
    1327            4 :         assert_eq!(
    1328            4 :             layer_visibilities.get(&covered_image),
    1329            4 :             Some(&LayerVisibilityHint::Covered)
    1330            4 :         );
    1331            4 :         assert_eq!(
    1332            4 :             layer_visibilities.get(&partially_covered_image),
    1333            4 :             Some(&LayerVisibilityHint::Visible)
    1334            4 :         );
    1335            4 :         assert_eq!(
    1336            4 :             layer_visibilities.get(&not_covered_image),
    1337            4 :             Some(&LayerVisibilityHint::Visible)
    1338            4 :         );
    1339            4 :         assert_eq!(
    1340            4 :             layer_visibilities.get(&covered_delta_below_read_point),
    1341            4 :             Some(&LayerVisibilityHint::Visible)
    1342            4 :         );
    1343            4 :         assert_eq!(
    1344            4 :             layer_visibilities.get(&covering_img_between_read_points),
    1345            4 :             Some(&LayerVisibilityHint::Visible)
    1346            4 :         );
    1347            4 :         assert_eq!(
    1348            4 :             layer_visibilities.get(&covered_delta_between_read_points),
    1349            4 :             Some(&LayerVisibilityHint::Covered)
    1350            4 :         );
    1351            4 :         assert_eq!(
    1352            4 :             layer_visibilities.get(&covered_delta_intersects_read_point),
    1353            4 :             Some(&LayerVisibilityHint::Visible)
    1354            4 :         );
    1355            4 :         assert_eq!(
    1356            4 :             layer_visibilities.get(&visible_img_after_last_read_point),
    1357            4 :             Some(&LayerVisibilityHint::Visible)
    1358            4 :         );
    1359              : 
    1360              :         // Shadow should include all the images below the last read point
    1361            4 :         let expected_shadow = KeySpace {
    1362            4 :             ranges: vec![Key::from_i128(10)..Key::from_i128(20)],
    1363            4 :         };
    1364            4 :         assert_eq!(shadow, expected_shadow);
    1365            4 :     }
    1366              : 
    1367            4 :     fn fixture_path(relative: &str) -> PathBuf {
    1368            4 :         PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
    1369            4 :     }
    1370              : 
    1371              :     #[test]
    1372            4 :     fn layer_visibility_realistic() {
    1373            4 :         // Load a large example layermap
    1374            4 :         let index_raw = std::fs::read_to_string(fixture_path(
    1375            4 :             "test_data/indices/mixed_workload/index_part.json",
    1376            4 :         ))
    1377            4 :         .unwrap();
    1378            4 :         let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
    1379            4 : 
    1380            4 :         let tenant_id = TenantId::generate();
    1381            4 :         let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1382            4 :         let timeline_id = TimelineId::generate();
    1383            4 : 
    1384            4 :         let mut layer_map = LayerMap::default();
    1385            4 :         let mut updates = layer_map.batch_update();
    1386         6260 :         for (layer_name, layer_metadata) in index.layer_metadata {
    1387         6256 :             let layer_desc = match layer_name {
    1388         3208 :                 LayerName::Image(layer_name) => PersistentLayerDesc {
    1389         3208 :                     key_range: layer_name.key_range.clone(),
    1390         3208 :                     lsn_range: layer_name.lsn_as_range(),
    1391         3208 :                     tenant_shard_id,
    1392         3208 :                     timeline_id,
    1393         3208 :                     is_delta: false,
    1394         3208 :                     file_size: layer_metadata.file_size,
    1395         3208 :                 },
    1396         3048 :                 LayerName::Delta(layer_name) => PersistentLayerDesc {
    1397         3048 :                     key_range: layer_name.key_range,
    1398         3048 :                     lsn_range: layer_name.lsn_range,
    1399         3048 :                     tenant_shard_id,
    1400         3048 :                     timeline_id,
    1401         3048 :                     is_delta: true,
    1402         3048 :                     file_size: layer_metadata.file_size,
    1403         3048 :                 },
    1404              :             };
    1405         6256 :             updates.insert_historic(layer_desc);
    1406              :         }
    1407            4 :         updates.flush();
    1408            4 : 
    1409            4 :         let read_points = vec![index.metadata.disk_consistent_lsn()];
    1410            4 :         let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
    1411         6260 :         for (layer_desc, visibility) in &layer_visibilities {
    1412         6256 :             tracing::info!("{layer_desc:?}: {visibility:?}");
    1413         6256 :             eprintln!("{layer_desc:?}: {visibility:?}");
    1414              :         }
    1415              : 
    1416              :         // The shadow should be non-empty, since there were some image layers
    1417            4 :         assert!(!shadow.ranges.is_empty());
    1418              : 
    1419              :         // At least some layers should be marked covered
    1420            4 :         assert!(layer_visibilities
    1421            4 :             .iter()
    1422           76 :             .any(|i| matches!(i.1, LayerVisibilityHint::Covered)));
    1423              : 
    1424            4 :         let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
    1425              : 
    1426              :         // Brute force validation: a layer should be marked covered if and only if there are image layers above it in LSN order which cover it
    1427         6260 :         for (layer_desc, visible) in &layer_visibilities {
    1428         6256 :             let mut coverage = KeySpaceRandomAccum::new();
    1429         6256 :             let mut covered_by = Vec::new();
    1430              : 
    1431      9784384 :             for other_layer in layer_map.iter_historic_layers() {
    1432      9784384 :                 if &other_layer == layer_desc {
    1433         6256 :                     continue;
    1434      9778128 :                 }
    1435      9778128 :                 if !other_layer.is_delta()
    1436      5014104 :                     && other_layer.image_layer_lsn() >= Lsn(layer_desc.get_lsn_range().end.0 - 1)
    1437      2527024 :                     && other_layer.key_range.start <= layer_desc.key_range.end
    1438       930608 :                     && layer_desc.key_range.start <= other_layer.key_range.end
    1439       171292 :                 {
    1440       171292 :                     coverage.add_range(other_layer.get_key_range());
    1441       171292 :                     covered_by.push((*other_layer).clone());
    1442      9606836 :                 }
    1443              :             }
    1444         6256 :             let coverage = coverage.to_keyspace();
    1445              : 
    1446         6256 :             let expect_visible = if coverage.ranges.len() == 1
    1447         1512 :                 && coverage.contains(&layer_desc.key_range.start)
    1448           68 :                 && coverage.contains(&Key::from_i128(layer_desc.key_range.end.to_i128() - 1))
    1449              :             {
    1450           40 :                 LayerVisibilityHint::Covered
    1451              :             } else {
    1452         6216 :                 LayerVisibilityHint::Visible
    1453              :             };
    1454              : 
    1455         6256 :             if expect_visible != *visible {
    1456            0 :                 eprintln!(
    1457            0 :                     "Layer {}..{} @ {}..{} (delta={}) is {visible:?}, should be {expect_visible:?}",
    1458            0 :                     layer_desc.key_range.start,
    1459            0 :                     layer_desc.key_range.end,
    1460            0 :                     layer_desc.lsn_range.start,
    1461            0 :                     layer_desc.lsn_range.end,
    1462            0 :                     layer_desc.is_delta()
    1463            0 :                 );
    1464            0 :                 if expect_visible == LayerVisibilityHint::Covered {
    1465            0 :                     eprintln!("Covered by:");
    1466            0 :                     for other in covered_by {
    1467            0 :                         eprintln!(
    1468            0 :                             "  {}..{} @ {}",
    1469            0 :                             other.get_key_range().start,
    1470            0 :                             other.get_key_range().end,
    1471            0 :                             other.image_layer_lsn()
    1472            0 :                         );
    1473            0 :                     }
    1474            0 :                     if let Some(range) = coverage.ranges.first() {
    1475            0 :                         eprintln!(
    1476            0 :                             "Total coverage from contributing layers: {}..{}",
    1477            0 :                             range.start, range.end
    1478            0 :                         );
    1479            0 :                     } else {
    1480            0 :                         eprintln!(
    1481            0 :                             "Total coverage from contributing layers: {:?}",
    1482            0 :                             coverage.ranges
    1483            0 :                         );
    1484            0 :                     }
    1485            0 :                 }
    1486         6256 :             }
    1487         6256 :             assert_eq!(expect_visible, *visible);
    1488              :         }
    1489              : 
    1490              :         // Sanity: the layer that holds latest data for the DBDIR key should always be visible
    1491              :         // (just using this key as a key that will always exist for any layermap fixture)
    1492            4 :         let dbdir_layer = layer_map
    1493            4 :             .search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
    1494            4 :             .unwrap();
    1495            4 :         assert!(matches!(
    1496            4 :             layer_visibilities.get(&dbdir_layer.layer).unwrap(),
    1497              :             LayerVisibilityHint::Visible
    1498              :         ));
    1499            4 :     }
    1500              : }
        

Generated by: LCOV version 2.1-beta