LCOV - code coverage report
Current view: top level - pageserver/src/tenant - layer_map.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 78.0 % 563 439
Test Date: 2024-02-07 07:37:29 Functions: 79.7 % 59 47

            Line data    Source code
       1              : //!
       2              : //! The layer map tracks what layers exist in a timeline.
       3              : //!
       4              : //! When the timeline is first accessed, the server lists of all layer files
       5              : //! in the timelines/<timeline_id> directory, and populates this map with
       6              : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
       7              : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
       8              : //! records. Now and then, in the checkpoint() function, the in-memory layer is
       9              : //! are frozen, and it is split up into new image and delta layers and the
      10              : //! corresponding files are written to disk.
      11              : //!
      12              : //! Design overview:
      13              : //!
      14              : //! The `search` method of the layer map is on the read critical path, so we've
      15              : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
      16              : //! Other read methods are less critical but still impact performance of background tasks.
      17              : //!
      18              : //! This data structure relies on a persistent/immutable binary search tree. See the
      19              : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
      20              : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
      21              : //! you to modify the tree in such a way that each modification creates a new "version"
      22              : //! of the tree. When you modify it, you get a new version, but all previous versions are
      23              : //! still accessible too. So if someone is still holding a reference to an older version,
      24              : //! they continue to see the tree as it was then. The persistent BST stores all the
      25              : //! different versions in an efficient way.
      26              : //!
      27              : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
      28              : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
      29              : //! to handle the LSN dimension.
      30              : //!
      31              : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
      32              : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
      33              : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
      34              : //! `historic_layer_coverage.rs`.
      35              : //!
      36              : //! To search for a particular key-LSN pair, you first look up the right "version" in the
      37              : //! BTreeMap. Then you search that version of the BST with the key.
      38              : //!
      39              : //! The persistent BST keeps all the versions, but there is no way to change the old versions
      40              : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
      41              : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
      42              : //! to throw away most of the persistent BST and build a new one, starting from the oldest
      43              : //! LSN. See [`LayerMap::flush_updates()`].
      44              : //!
      45              : 
      46              : mod historic_layer_coverage;
      47              : mod layer_coverage;
      48              : 
      49              : use crate::context::RequestContext;
      50              : use crate::keyspace::KeyPartitioning;
      51              : use crate::repository::Key;
      52              : use crate::tenant::storage_layer::InMemoryLayer;
      53              : use anyhow::Result;
      54              : use pageserver_api::keyspace::KeySpaceAccum;
      55              : use std::cmp::Ordering;
      56              : use std::collections::{BTreeMap, VecDeque};
      57              : use std::iter::Peekable;
      58              : use std::ops::Range;
      59              : use std::sync::Arc;
      60              : use utils::lsn::Lsn;
      61              : 
      62              : use historic_layer_coverage::BufferedHistoricLayerCoverage;
      63              : pub use historic_layer_coverage::LayerKey;
      64              : 
      65              : use super::storage_layer::PersistentLayerDesc;
      66              : 
      67              : ///
      68              : /// LayerMap tracks what layers exist on a timeline.
      69              : ///
      70         1572 : #[derive(Default)]
      71              : pub struct LayerMap {
      72              :     //
      73              :     // 'open_layer' holds the current InMemoryLayer that is accepting new
      74              :     // records. If it is None, 'next_open_layer_at' will be set instead, indicating
      75              :     // where the start LSN of the next InMemoryLayer that is to be created.
      76              :     //
      77              :     pub open_layer: Option<Arc<InMemoryLayer>>,
      78              :     pub next_open_layer_at: Option<Lsn>,
      79              : 
      80              :     ///
      81              :     /// Frozen layers, if any. Frozen layers are in-memory layers that
      82              :     /// are no longer added to, but haven't been written out to disk
      83              :     /// yet. They contain WAL older than the current 'open_layer' or
      84              :     /// 'next_open_layer_at', but newer than any historic layer.
      85              :     /// The frozen layers are in order from oldest to newest, so that
      86              :     /// the newest one is in the 'back' of the VecDeque, and the oldest
      87              :     /// in the 'front'.
      88              :     ///
      89              :     pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
      90              : 
      91              :     /// Index of the historic layers optimized for search
      92              :     historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
      93              : 
      94              :     /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
      95              :     /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
      96              :     l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
      97              : }
      98              : 
      99              : /// The primary update API for the layer map.
     100              : ///
     101              : /// Batching historic layer insertions and removals is good for
     102              : /// performance and this struct helps us do that correctly.
     103              : #[must_use]
     104              : pub struct BatchedUpdates<'a> {
     105              :     // While we hold this exclusive reference to the layer map the type checker
     106              :     // will prevent us from accidentally reading any unflushed updates.
     107              :     layer_map: &'a mut LayerMap,
     108              : }
     109              : 
     110              : /// Provide ability to batch more updates while hiding the read
     111              : /// API so we don't accidentally read without flushing.
     112              : impl BatchedUpdates<'_> {
     113              :     ///
     114              :     /// Insert an on-disk layer.
     115              :     ///
     116              :     // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
     117        78770 :     pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
     118        78770 :         self.layer_map.insert_historic_noflush(layer_desc)
     119        78770 :     }
     120              : 
     121              :     ///
     122              :     /// Remove an on-disk layer from the map.
     123              :     ///
     124              :     /// This should be called when the corresponding file on disk has been deleted.
     125              :     ///
     126         5410 :     pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
     127         5410 :         self.layer_map.remove_historic_noflush(layer_desc)
     128         5410 :     }
     129              : 
     130              :     // We will flush on drop anyway, but this method makes it
     131              :     // more explicit that there is some work being done.
     132              :     /// Apply all updates
     133         7507 :     pub fn flush(self) {
     134         7507 :         // Flush happens on drop
     135         7507 :     }
     136              : }
     137              : 
     138              : // Ideally the flush() method should be called explicitly for more
     139              : // controlled execution. But if we forget we'd rather flush on drop
     140              : // than panic later or read without flushing.
     141              : //
     142              : // TODO maybe warn if flush hasn't explicitly been called
     143              : impl Drop for BatchedUpdates<'_> {
     144         7507 :     fn drop(&mut self) {
     145         7507 :         self.layer_map.flush_updates();
     146         7507 :     }
     147              : }
     148              : 
     149              : /// Return value of LayerMap::search
     150         8230 : #[derive(Eq, PartialEq, Debug)]
     151              : pub struct SearchResult {
     152              :     pub layer: Arc<PersistentLayerDesc>,
     153              :     pub lsn_floor: Lsn,
     154              : }
     155              : 
     156              : pub struct OrderedSearchResult(SearchResult);
     157              : 
     158              : impl Ord for OrderedSearchResult {
     159        91470 :     fn cmp(&self, other: &Self) -> Ordering {
     160        91470 :         self.0.lsn_floor.cmp(&other.0.lsn_floor)
     161        91470 :     }
     162              : }
     163              : 
     164              : impl PartialOrd for OrderedSearchResult {
     165            0 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     166            0 :         Some(self.cmp(other))
     167            0 :     }
     168              : }
     169              : 
     170              : impl PartialEq for OrderedSearchResult {
     171            0 :     fn eq(&self, other: &Self) -> bool {
     172            0 :         self.0.lsn_floor == other.0.lsn_floor
     173            0 :     }
     174              : }
     175              : 
     176              : impl Eq for OrderedSearchResult {}
     177              : 
     178              : pub struct RangeSearchResult {
     179              :     pub found: BTreeMap<OrderedSearchResult, KeySpaceAccum>,
     180              :     pub not_found: KeySpaceAccum,
     181              : }
     182              : 
     183              : impl RangeSearchResult {
     184         7080 :     fn new() -> Self {
     185         7080 :         Self {
     186         7080 :             found: BTreeMap::new(),
     187         7080 :             not_found: KeySpaceAccum::new(),
     188         7080 :         }
     189         7080 :     }
     190              : }
     191              : 
     192              : /// Collector for results of range search queries on the LayerMap.
     193              : /// It should be provided with two iterators for the delta and image coverage
     194              : /// that contain all the changes for layers which intersect the range.
     195              : struct RangeSearchCollector<Iter>
     196              : where
     197              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     198              : {
     199              :     delta_coverage: Peekable<Iter>,
     200              :     image_coverage: Peekable<Iter>,
     201              :     key_range: Range<Key>,
     202              :     end_lsn: Lsn,
     203              : 
     204              :     current_delta: Option<Arc<PersistentLayerDesc>>,
     205              :     current_image: Option<Arc<PersistentLayerDesc>>,
     206              : 
     207              :     result: RangeSearchResult,
     208              : }
     209              : 
     210            0 : #[derive(Debug)]
     211              : enum NextLayerType {
     212              :     Delta(i128),
     213              :     Image(i128),
     214              :     Both(i128),
     215              : }
     216              : 
     217              : impl NextLayerType {
     218        13836 :     fn next_change_at_key(&self) -> Key {
     219        13836 :         match self {
     220         4600 :             NextLayerType::Delta(at) => Key::from_i128(*at),
     221         2484 :             NextLayerType::Image(at) => Key::from_i128(*at),
     222         6752 :             NextLayerType::Both(at) => Key::from_i128(*at),
     223              :         }
     224        13836 :     }
     225              : }
     226              : 
     227              : impl<Iter> RangeSearchCollector<Iter>
     228              : where
     229              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     230              : {
     231         3540 :     fn new(
     232         3540 :         key_range: Range<Key>,
     233         3540 :         end_lsn: Lsn,
     234         3540 :         delta_coverage: Iter,
     235         3540 :         image_coverage: Iter,
     236         3540 :     ) -> Self {
     237         3540 :         Self {
     238         3540 :             delta_coverage: delta_coverage.peekable(),
     239         3540 :             image_coverage: image_coverage.peekable(),
     240         3540 :             key_range,
     241         3540 :             end_lsn,
     242         3540 :             current_delta: None,
     243         3540 :             current_image: None,
     244         3540 :             result: RangeSearchResult::new(),
     245         3540 :         }
     246         3540 :     }
     247              : 
     248              :     /// Run the collector. Collection is implemented via a two pointer algorithm.
     249              :     /// One pointer tracks the start of the current range and the other tracks
     250              :     /// the beginning of the next range which will overlap with the next change
     251              :     /// in coverage across both image and delta.
     252         3540 :     fn collect(mut self) -> RangeSearchResult {
     253         3540 :         let next_layer_type = self.choose_next_layer_type();
     254         3358 :         let mut current_range_start = match next_layer_type {
     255              :             None => {
     256              :                 // No changes for the range
     257          182 :                 self.pad_range(self.key_range.clone());
     258          182 :                 return self.result;
     259              :             }
     260         3358 :             Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
     261            0 :                 // Changes only after the end of the range
     262            0 :                 self.pad_range(self.key_range.clone());
     263            0 :                 return self.result;
     264              :             }
     265         3358 :             Some(layer_type) => {
     266         3358 :                 // Changes for the range exist. Record anything before the first
     267         3358 :                 // coverage change as not found.
     268         3358 :                 let coverage_start = layer_type.next_change_at_key();
     269         3358 :                 let range_before = self.key_range.start..coverage_start;
     270         3358 :                 self.pad_range(range_before);
     271         3358 : 
     272         3358 :                 self.advance(&layer_type);
     273         3358 :                 coverage_start
     274              :             }
     275              :         };
     276              : 
     277        13836 :         while current_range_start < self.key_range.end {
     278        10478 :             let next_layer_type = self.choose_next_layer_type();
     279        10478 :             match next_layer_type {
     280         7120 :                 Some(t) => {
     281         7120 :                     let current_range_end = t.next_change_at_key();
     282         7120 :                     self.add_range(current_range_start..current_range_end);
     283         7120 :                     current_range_start = current_range_end;
     284         7120 : 
     285         7120 :                     self.advance(&t);
     286         7120 :                 }
     287         3358 :                 None => {
     288         3358 :                     self.add_range(current_range_start..self.key_range.end);
     289         3358 :                     current_range_start = self.key_range.end;
     290         3358 :                 }
     291              :             }
     292              :         }
     293              : 
     294         3358 :         self.result
     295         3540 :     }
     296              : 
     297              :     /// Mark a range as not found (i.e. no layers intersect it)
     298         4458 :     fn pad_range(&mut self, key_range: Range<Key>) {
     299         4458 :         if !key_range.is_empty() {
     300         2080 :             self.result.not_found.add_range(key_range);
     301         2378 :         }
     302         4458 :     }
     303              : 
     304              :     /// Select the appropiate layer for the given range and update
     305              :     /// the collector.
     306        10478 :     fn add_range(&mut self, covered_range: Range<Key>) {
     307        10478 :         let selected = LayerMap::select_layer(
     308        10478 :             self.current_delta.clone(),
     309        10478 :             self.current_image.clone(),
     310        10478 :             self.end_lsn,
     311        10478 :         );
     312        10478 : 
     313        10478 :         match selected {
     314         9560 :             Some(search_result) => self
     315         9560 :                 .result
     316         9560 :                 .found
     317         9560 :                 .entry(OrderedSearchResult(search_result))
     318         9560 :                 .or_default()
     319         9560 :                 .add_range(covered_range),
     320          918 :             None => self.pad_range(covered_range),
     321              :         }
     322        10478 :     }
     323              : 
     324              :     /// Move to the next coverage change.
     325        10478 :     fn advance(&mut self, layer_type: &NextLayerType) {
     326        10478 :         match layer_type {
     327         3150 :             NextLayerType::Delta(_) => {
     328         3150 :                 let (_, layer) = self.delta_coverage.next().unwrap();
     329         3150 :                 self.current_delta = layer;
     330         3150 :             }
     331         1692 :             NextLayerType::Image(_) => {
     332         1692 :                 let (_, layer) = self.image_coverage.next().unwrap();
     333         1692 :                 self.current_image = layer;
     334         1692 :             }
     335         5636 :             NextLayerType::Both(_) => {
     336         5636 :                 let (_, image_layer) = self.image_coverage.next().unwrap();
     337         5636 :                 let (_, delta_layer) = self.delta_coverage.next().unwrap();
     338         5636 : 
     339         5636 :                 self.current_image = image_layer;
     340         5636 :                 self.current_delta = delta_layer;
     341         5636 :             }
     342              :         }
     343        10478 :     }
     344              : 
     345              :     /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
     346        14018 :     fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
     347        14018 :         let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
     348        14018 :         let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
     349        14018 : 
     350        14018 :         match (next_delta_at, next_image_at) {
     351         3540 :             (None, None) => None,
     352          630 :             (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
     353         1260 :             (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
     354         8588 :             (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
     355          432 :                 Some(NextLayerType::Image(*next_image_at))
     356              :             }
     357         8156 :             (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
     358         2520 :                 Some(NextLayerType::Delta(*next_delta_at))
     359              :             }
     360         5636 :             (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
     361              :         }
     362        14018 :     }
     363              : }
     364              : 
     365              : impl LayerMap {
     366              :     ///
     367              :     /// Find the latest layer (by lsn.end) that covers the given
     368              :     /// 'key', with lsn.start < 'end_lsn'.
     369              :     ///
     370              :     /// The caller of this function is the page reconstruction
     371              :     /// algorithm looking for the next relevant delta layer, or
     372              :     /// the terminal image layer. The caller will pass the lsn_floor
     373              :     /// value as end_lsn in the next call to search.
     374              :     ///
     375              :     /// If there's an image layer exactly below the given end_lsn,
     376              :     /// search should return that layer regardless if there are
     377              :     /// overlapping deltas.
     378              :     ///
     379              :     /// If the latest layer is a delta and there is an overlapping
     380              :     /// image with it below, the lsn_floor returned should be right
     381              :     /// above that image so we don't skip it in the search. Otherwise
     382              :     /// the lsn_floor returned should be the bottom of the delta layer
     383              :     /// because we should make as much progress down the lsn axis
     384              :     /// as possible. It's fine if this way we skip some overlapping
     385              :     /// deltas, because the delta we returned would contain the same
     386              :     /// wal content.
     387              :     ///
     388              :     /// TODO: This API is convoluted and inefficient. If the caller
     389              :     /// makes N search calls, we'll end up finding the same latest
     390              :     /// image layer N times. We should either cache the latest image
     391              :     /// layer result, or simplify the api to `get_latest_image` and
     392              :     /// `get_latest_delta`, and only call `get_latest_image` once.
     393              :     ///
     394              :     /// NOTE: This only searches the 'historic' layers, *not* the
     395              :     /// 'open' and 'frozen' layers!
     396              :     ///
     397     24145599 :     pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
     398     24145599 :         let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
     399     24132805 :         let latest_delta = version.delta_coverage.query(key.to_i128());
     400     24132805 :         let latest_image = version.image_coverage.query(key.to_i128());
     401     24132805 : 
     402     24132805 :         Self::select_layer(latest_delta, latest_image, end_lsn)
     403     24145599 :     }
     404              : 
     405     24143283 :     fn select_layer(
     406     24143283 :         delta_layer: Option<Arc<PersistentLayerDesc>>,
     407     24143283 :         image_layer: Option<Arc<PersistentLayerDesc>>,
     408     24143283 :         end_lsn: Lsn,
     409     24143283 :     ) -> Option<SearchResult> {
     410     24143283 :         assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
     411     24143283 :         assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
     412              : 
     413     24143283 :         match (delta_layer, image_layer) {
     414       137851 :             (None, None) => None,
     415        84084 :             (None, Some(image)) => {
     416        84084 :                 let lsn_floor = image.get_lsn_range().start;
     417        84084 :                 Some(SearchResult {
     418        84084 :                     layer: image,
     419        84084 :                     lsn_floor,
     420        84084 :                 })
     421              :             }
     422     19061016 :             (Some(delta), None) => {
     423     19061016 :                 let lsn_floor = delta.get_lsn_range().start;
     424     19061016 :                 Some(SearchResult {
     425     19061016 :                     layer: delta,
     426     19061016 :                     lsn_floor,
     427     19061016 :                 })
     428              :             }
     429      4860332 :             (Some(delta), Some(image)) => {
     430      4860332 :                 let img_lsn = image.get_lsn_range().start;
     431      4860332 :                 let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
     432      4860332 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     433      4860332 :                 if image_is_newer || image_exact_match {
     434      1096341 :                     Some(SearchResult {
     435      1096341 :                         layer: image,
     436      1096341 :                         lsn_floor: img_lsn,
     437      1096341 :                     })
     438              :                 } else {
     439      3763991 :                     let lsn_floor =
     440      3763991 :                         std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
     441      3763991 :                     Some(SearchResult {
     442      3763991 :                         layer: delta,
     443      3763991 :                         lsn_floor,
     444      3763991 :                     })
     445              :                 }
     446              :             }
     447              :         }
     448     24143283 :     }
     449              : 
     450         3542 :     pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> Option<RangeSearchResult> {
     451         3542 :         let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
     452              : 
     453         3540 :         let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
     454         3540 :         let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
     455         3540 :         let image_changes = version.image_coverage.range_overlaps(&raw_range);
     456         3540 : 
     457         3540 :         let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
     458         3540 :         Some(collector.collect())
     459         3542 :     }
     460              : 
     461              :     /// Start a batch of updates, applied on drop
     462         7507 :     pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
     463         7507 :         BatchedUpdates { layer_map: self }
     464         7507 :     }
     465              : 
     466              :     ///
     467              :     /// Insert an on-disk layer
     468              :     ///
     469              :     /// Helper function for BatchedUpdates::insert_historic
     470              :     ///
     471              :     /// TODO(chi): remove L generic so that we do not need to pass layer object.
     472        78780 :     pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
     473        78780 :         // TODO: See #3869, resulting #4088, attempted fix and repro #4094
     474        78780 : 
     475        78780 :         if Self::is_l0(&layer_desc) {
     476         6238 :             self.l0_delta_layers.push(layer_desc.clone().into());
     477        72542 :         }
     478              : 
     479        78780 :         self.historic.insert(
     480        78780 :             historic_layer_coverage::LayerKey::from(&layer_desc),
     481        78780 :             layer_desc.into(),
     482        78780 :         );
     483        78780 :     }
     484              : 
     485              :     ///
     486              :     /// Remove an on-disk layer from the map.
     487              :     ///
     488              :     /// Helper function for BatchedUpdates::remove_historic
     489              :     ///
     490         5410 :     pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
     491         5410 :         self.historic
     492         5410 :             .remove(historic_layer_coverage::LayerKey::from(layer_desc));
     493         5410 :         let layer_key = layer_desc.key();
     494         5410 :         if Self::is_l0(layer_desc) {
     495         4213 :             let len_before = self.l0_delta_layers.len();
     496         4213 :             let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
     497        74302 :             l0_delta_layers.retain(|other| other.key() != layer_key);
     498         4213 :             self.l0_delta_layers = l0_delta_layers;
     499         4213 :             // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
     500         4213 :             // there's a chance that the comparison fails at runtime due to it comparing (pointer,
     501         4213 :             // vtable) pairs.
     502         4213 :             assert_eq!(
     503         4213 :                 self.l0_delta_layers.len(),
     504         4213 :                 len_before - 1,
     505            0 :                 "failed to locate removed historic layer from l0_delta_layers"
     506              :             );
     507         1197 :         }
     508         5410 :     }
     509              : 
     510              :     /// Helper function for BatchedUpdates::drop.
     511         7509 :     pub(self) fn flush_updates(&mut self) {
     512         7509 :         self.historic.rebuild();
     513         7509 :     }
     514              : 
     515              :     /// Is there a newer image layer for given key- and LSN-range? Or a set
     516              :     /// of image layers within the specified lsn range that cover the entire
     517              :     /// specified key range?
     518              :     ///
     519              :     /// This is used for garbage collection, to determine if an old layer can
     520              :     /// be deleted.
     521        14763 :     pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
     522        14763 :         if key.is_empty() {
     523              :             // Vacuously true. There's a newer image for all 0 of the kerys in the range.
     524            0 :             return true;
     525        14763 :         }
     526              : 
     527        14763 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     528        14763 :             Some(v) => v,
     529            0 :             None => return false,
     530              :         };
     531              : 
     532        14763 :         let start = key.start.to_i128();
     533        14763 :         let end = key.end.to_i128();
     534        14763 : 
     535        16089 :         let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
     536        13140 :             Some(layer) => layer.get_lsn_range().start >= lsn.start,
     537         2949 :             None => false,
     538        16089 :         };
     539              : 
     540              :         // Check the start is covered
     541        14763 :         if !layer_covers(version.image_coverage.query(start)) {
     542        13520 :             return false;
     543         1243 :         }
     544              : 
     545              :         // Check after all changes of coverage
     546         1326 :         for (_, change_val) in version.image_coverage.range(start..end) {
     547         1326 :             if !layer_covers(change_val) {
     548           46 :                 return false;
     549         1280 :             }
     550              :         }
     551              : 
     552         1197 :         true
     553        14763 :     }
     554              : 
     555         6919 :     pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
     556         6919 :         self.historic.iter()
     557         6919 :     }
     558              : 
     559              :     ///
     560              :     /// Divide the whole given range of keys into sub-ranges based on the latest
     561              :     /// image layer that covers each range at the specified lsn (inclusive).
     562              :     /// This is used when creating  new image layers.
     563      2238562 :     pub fn image_coverage(
     564      2238562 :         &self,
     565      2238562 :         key_range: &Range<Key>,
     566      2238562 :         lsn: Lsn,
     567      2238562 :     ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
     568      2238562 :         let version = match self.historic.get().unwrap().get_version(lsn.0) {
     569      2045633 :             Some(v) => v,
     570       192929 :             None => return vec![],
     571              :         };
     572              : 
     573      2045633 :         let start = key_range.start.to_i128();
     574      2045633 :         let end = key_range.end.to_i128();
     575      2045633 : 
     576      2045633 :         // Initialize loop variables
     577      2045633 :         let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
     578      2045633 :         let mut current_key = start;
     579      2045633 :         let mut current_val = version.image_coverage.query(start);
     580              : 
     581              :         // Loop through the change events and push intervals
     582      2045633 :         for (change_key, change_val) in version.image_coverage.range(start..end) {
     583        16129 :             let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     584        16129 :             coverage.push((kr, current_val.take()));
     585        16129 :             current_key = change_key;
     586        16129 :             current_val = change_val.clone();
     587        16129 :         }
     588              : 
     589              :         // Add the final interval
     590      2045633 :         let kr = Key::from_i128(current_key)..Key::from_i128(end);
     591      2045633 :         coverage.push((kr, current_val.take()));
     592      2045633 : 
     593      2045633 :         coverage
     594      2238562 :     }
     595              : 
     596      2258870 :     pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
     597      2258870 :         layer.get_key_range() == (Key::MIN..Key::MAX)
     598      2258870 :     }
     599              : 
     600              :     /// This function determines which layers are counted in `count_deltas`:
     601              :     /// layers that should count towards deciding whether or not to reimage
     602              :     /// a certain partition range.
     603              :     ///
     604              :     /// There are two kinds of layers we currently consider reimage-worthy:
     605              :     ///
     606              :     /// Case 1: Non-L0 layers are currently reimage-worthy by default.
     607              :     /// TODO Some of these layers are very sparse and cover the entire key
     608              :     ///      range. Replacing 256MB of data (or less!) with terabytes of
     609              :     ///      images doesn't seem wise. We need a better heuristic, possibly
     610              :     ///      based on some of these factors:
     611              :     ///      a) whether this layer has any wal in this partition range
     612              :     ///      b) the size of the layer
     613              :     ///      c) the number of images needed to cover it
     614              :     ///      d) the estimated time until we'll have to reimage over it for GC
     615              :     ///
     616              :     /// Case 2: Since L0 layers by definition cover the entire key space, we consider
     617              :     /// them reimage-worthy only when the entire key space can be covered by very few
     618              :     /// images (currently 1).
     619              :     /// TODO The optimal number should probably be slightly higher than 1, but to
     620              :     ///      implement that we need to plumb a lot more context into this function
     621              :     ///      than just the current partition_range.
     622      2163950 :     pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
     623      2163950 :         // Case 1
     624      2163950 :         if !Self::is_l0(layer) {
     625       339030 :             return true;
     626      1824920 :         }
     627      1824920 : 
     628      1824920 :         // Case 2
     629      1824920 :         if partition_range == &(Key::MIN..Key::MAX) {
     630            0 :             return true;
     631      1824920 :         }
     632      1824920 : 
     633      1824920 :         false
     634      2163950 :     }
     635              : 
     636              :     /// Count the height of the tallest stack of reimage-worthy deltas
     637              :     /// in this 2d region.
     638              :     ///
     639              :     /// If `limit` is provided we don't try to count above that number.
     640              :     ///
     641              :     /// This number is used to compute the largest number of deltas that
     642              :     /// we'll need to visit for any page reconstruction in this region.
     643              :     /// We use this heuristic to decide whether to create an image layer.
     644      3984055 :     pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
     645      3984055 :         // We get the delta coverage of the region, and for each part of the coverage
     646      3984055 :         // we recurse right underneath the delta. The recursion depth is limited by
     647      3984055 :         // the largest result this function could return, which is in practice between
     648      3984055 :         // 3 and 10 (since we usually try to create an image when the number gets larger).
     649      3984055 : 
     650      3984055 :         if lsn.is_empty() || key.is_empty() || limit == Some(0) {
     651       222073 :             return 0;
     652      3761982 :         }
     653              : 
     654      3761982 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     655      2585162 :             Some(v) => v,
     656      1176820 :             None => return 0,
     657              :         };
     658              : 
     659      2585162 :         let start = key.start.to_i128();
     660      2585162 :         let end = key.end.to_i128();
     661      2585162 : 
     662      2585162 :         // Initialize loop variables
     663      2585162 :         let mut max_stacked_deltas = 0;
     664      2585162 :         let mut current_key = start;
     665      2585162 :         let mut current_val = version.delta_coverage.query(start);
     666              : 
     667              :         // Loop through the delta coverage and recurse on each part
     668      2585162 :         for (change_key, change_val) in version.delta_coverage.range(start..end) {
     669              :             // If there's a relevant delta in this part, add 1 and recurse down
     670        40714 :             if let Some(val) = current_val {
     671        40527 :                 if val.get_lsn_range().end > lsn.start {
     672        12061 :                     let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     673        12061 :                     let lr = lsn.start..val.get_lsn_range().start;
     674        12061 :                     if !kr.is_empty() {
     675         9202 :                         let base_count = Self::is_reimage_worthy(&val, key) as usize;
     676         9202 :                         let new_limit = limit.map(|l| l - base_count);
     677         9202 :                         let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     678         9202 :                         max_stacked_deltas = std::cmp::max(
     679         9202 :                             max_stacked_deltas,
     680         9202 :                             base_count + max_stacked_deltas_underneath,
     681         9202 :                         );
     682         9202 :                     }
     683        28466 :                 }
     684          187 :             }
     685              : 
     686        40714 :             current_key = change_key;
     687        40714 :             current_val = change_val.clone();
     688              :         }
     689              : 
     690              :         // Consider the last part
     691      2585161 :         if let Some(val) = current_val {
     692      2417532 :             if val.get_lsn_range().end > lsn.start {
     693      2154748 :                 let kr = Key::from_i128(current_key)..Key::from_i128(end);
     694      2154748 :                 let lr = lsn.start..val.get_lsn_range().start;
     695      2154748 : 
     696      2154748 :                 if !kr.is_empty() {
     697      2154748 :                     let base_count = Self::is_reimage_worthy(&val, key) as usize;
     698      2154748 :                     let new_limit = limit.map(|l| l - base_count);
     699      2154748 :                     let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     700      2154748 :                     max_stacked_deltas = std::cmp::max(
     701      2154748 :                         max_stacked_deltas,
     702      2154748 :                         base_count + max_stacked_deltas_underneath,
     703      2154748 :                     );
     704      2154748 :                 }
     705       262784 :             }
     706       167629 :         }
     707              : 
     708      2585161 :         max_stacked_deltas
     709      3984054 :     }
     710              : 
     711              :     /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
     712              :     ///
     713              :     /// The `partition_range` argument is used as context for the reimage-worthiness decision.
     714              :     ///
     715              :     /// Used as a helper for correctness checks only. Performance not critical.
     716            0 :     pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
     717            0 :         match self.search(key, lsn) {
     718            0 :             Some(search_result) => {
     719            0 :                 if search_result.layer.is_incremental() {
     720            0 :                     (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
     721            0 :                         + self.get_difficulty(search_result.lsn_floor, key, partition_range)
     722              :                 } else {
     723            0 :                     0
     724              :                 }
     725              :             }
     726            0 :             None => 0,
     727              :         }
     728            0 :     }
     729              : 
     730              :     /// Used for correctness checking. Results are expected to be identical to
     731              :     /// self.get_difficulty_map. Assumes self.search is correct.
     732            0 :     pub fn get_difficulty_map_bruteforce(
     733            0 :         &self,
     734            0 :         lsn: Lsn,
     735            0 :         partitioning: &KeyPartitioning,
     736            0 :     ) -> Vec<usize> {
     737            0 :         // Looking at the difficulty as a function of key, it could only increase
     738            0 :         // when a delta layer starts or an image layer ends. Therefore it's sufficient
     739            0 :         // to check the difficulties at:
     740            0 :         // - the key.start for each non-empty part range
     741            0 :         // - the key.start for each delta
     742            0 :         // - the key.end for each image
     743            0 :         let keys_iter: Box<dyn Iterator<Item = Key>> = {
     744            0 :             let mut keys: Vec<Key> = self
     745            0 :                 .iter_historic_layers()
     746            0 :                 .map(|layer| {
     747            0 :                     if layer.is_incremental() {
     748            0 :                         layer.get_key_range().start
     749              :                     } else {
     750            0 :                         layer.get_key_range().end
     751              :                     }
     752            0 :                 })
     753            0 :                 .collect();
     754            0 :             keys.sort();
     755            0 :             Box::new(keys.into_iter())
     756            0 :         };
     757            0 :         let mut keys_iter = keys_iter.peekable();
     758            0 : 
     759            0 :         // Iter the partition and keys together and query all the necessary
     760            0 :         // keys, computing the max difficulty for each part.
     761            0 :         partitioning
     762            0 :             .parts
     763            0 :             .iter()
     764            0 :             .map(|part| {
     765            0 :                 let mut difficulty = 0;
     766              :                 // Partition ranges are assumed to be sorted and disjoint
     767              :                 // TODO assert it
     768            0 :                 for range in &part.ranges {
     769            0 :                     if !range.is_empty() {
     770            0 :                         difficulty =
     771            0 :                             std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
     772            0 :                     }
     773            0 :                     while let Some(key) = keys_iter.peek() {
     774            0 :                         if key >= &range.end {
     775            0 :                             break;
     776            0 :                         }
     777            0 :                         let key = keys_iter.next().unwrap();
     778            0 :                         if key < range.start {
     779            0 :                             continue;
     780            0 :                         }
     781            0 :                         difficulty =
     782            0 :                             std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
     783              :                     }
     784              :                 }
     785            0 :                 difficulty
     786            0 :             })
     787            0 :             .collect()
     788            0 :     }
     789              : 
     790              :     /// For each part of a keyspace partitioning, return the maximum number of layers
     791              :     /// that would be needed for page reconstruction in that part at the given LSN.
     792              :     ///
     793              :     /// If `limit` is provided we don't try to count above that number.
     794              :     ///
     795              :     /// This method is used to decide where to create new image layers. Computing the
     796              :     /// result for the entire partitioning at once allows this function to be more
     797              :     /// efficient, and further optimization is possible by using iterators instead,
     798              :     /// to allow early return.
     799              :     ///
     800              :     /// TODO actually use this method instead of count_deltas. Currently we only use
     801              :     ///      it for benchmarks.
     802            0 :     pub fn get_difficulty_map(
     803            0 :         &self,
     804            0 :         lsn: Lsn,
     805            0 :         partitioning: &KeyPartitioning,
     806            0 :         limit: Option<usize>,
     807            0 :     ) -> Vec<usize> {
     808            0 :         // TODO This is a naive implementation. Perf improvements to do:
     809            0 :         // 1. Instead of calling self.image_coverage and self.count_deltas,
     810            0 :         //    iterate the image and delta coverage only once.
     811            0 :         partitioning
     812            0 :             .parts
     813            0 :             .iter()
     814            0 :             .map(|part| {
     815            0 :                 let mut difficulty = 0;
     816            0 :                 for range in &part.ranges {
     817            0 :                     if limit == Some(difficulty) {
     818            0 :                         break;
     819            0 :                     }
     820            0 :                     for (img_range, last_img) in self.image_coverage(range, lsn) {
     821            0 :                         if limit == Some(difficulty) {
     822            0 :                             break;
     823            0 :                         }
     824            0 :                         let img_lsn = if let Some(last_img) = last_img {
     825            0 :                             last_img.get_lsn_range().end
     826              :                         } else {
     827            0 :                             Lsn(0)
     828              :                         };
     829              : 
     830            0 :                         if img_lsn < lsn {
     831            0 :                             let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
     832            0 :                             difficulty = std::cmp::max(difficulty, num_deltas);
     833            0 :                         }
     834              :                     }
     835              :                 }
     836            0 :                 difficulty
     837            0 :             })
     838            0 :             .collect()
     839            0 :     }
     840              : 
     841              :     /// Return all L0 delta layers
     842         1521 :     pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
     843         1521 :         Ok(self.l0_delta_layers.to_vec())
     844         1521 :     }
     845              : 
     846              :     /// debugging function to print out the contents of the layer map
     847              :     #[allow(unused)]
     848            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     849            0 :         println!("Begin dump LayerMap");
     850            0 : 
     851            0 :         println!("open_layer:");
     852            0 :         if let Some(open_layer) = &self.open_layer {
     853            0 :             open_layer.dump(verbose, ctx).await?;
     854            0 :         }
     855              : 
     856            0 :         println!("frozen_layers:");
     857            0 :         for frozen_layer in self.frozen_layers.iter() {
     858            0 :             frozen_layer.dump(verbose, ctx).await?;
     859              :         }
     860              : 
     861            0 :         println!("historic_layers:");
     862            0 :         for desc in self.iter_historic_layers() {
     863            0 :             desc.dump();
     864            0 :         }
     865            0 :         println!("End dump LayerMap");
     866            0 :         Ok(())
     867            0 :     }
     868              : }
     869              : 
     870              : #[cfg(test)]
     871              : mod tests {
     872              :     use super::*;
     873              : 
     874           10 :     #[derive(Clone)]
     875              :     struct LayerDesc {
     876              :         key_range: Range<Key>,
     877              :         lsn_range: Range<Lsn>,
     878              :         is_delta: bool,
     879              :     }
     880              : 
     881            2 :     fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
     882            2 :         let mut layer_map = LayerMap::default();
     883              : 
     884           12 :         for layer in layers {
     885           10 :             layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
     886           10 :                 layer.key_range,
     887           10 :                 layer.lsn_range,
     888           10 :                 layer.is_delta,
     889           10 :             ));
     890           10 :         }
     891              : 
     892            2 :         layer_map.flush_updates();
     893            2 :         layer_map
     894            2 :     }
     895              : 
     896         3540 :     fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
     897         3540 :         assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
     898         3540 :         let lhs: Vec<_> = lhs
     899         3540 :             .found
     900         3540 :             .into_iter()
     901         8230 :             .map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
     902         3540 :             .collect();
     903         3540 :         let rhs: Vec<_> = rhs
     904         3540 :             .found
     905         3540 :             .into_iter()
     906         8230 :             .map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
     907         3540 :             .collect();
     908         3540 : 
     909         3540 :         assert_eq!(lhs, rhs);
     910         3540 :     }
     911              : 
     912         3540 :     fn brute_force_range_search(
     913         3540 :         layer_map: &LayerMap,
     914         3540 :         key_range: Range<Key>,
     915         3540 :         end_lsn: Lsn,
     916         3540 :     ) -> RangeSearchResult {
     917         3540 :         let mut range_search_result = RangeSearchResult::new();
     918         3540 : 
     919         3540 :         let mut key = key_range.start;
     920        75520 :         while key != key_range.end {
     921        71980 :             let res = layer_map.search(key, end_lsn);
     922        71980 :             match res {
     923        61320 :                 Some(res) => {
     924        61320 :                     range_search_result
     925        61320 :                         .found
     926        61320 :                         .entry(OrderedSearchResult(res))
     927        61320 :                         .or_default()
     928        61320 :                         .add_key(key);
     929        61320 :                 }
     930        10660 :                 None => {
     931        10660 :                     range_search_result.not_found.add_key(key);
     932        10660 :                 }
     933              :             }
     934              : 
     935        71980 :             key = key.next();
     936              :         }
     937              : 
     938         3540 :         range_search_result
     939         3540 :     }
     940              : 
     941            2 :     #[test]
     942            2 :     fn ranged_search_on_empty_layer_map() {
     943            2 :         let layer_map = LayerMap::default();
     944            2 :         let range = Key::from_i128(100)..Key::from_i128(200);
     945            2 : 
     946            2 :         let res = layer_map.range_search(range, Lsn(100));
     947            2 :         assert!(res.is_none());
     948            2 :     }
     949              : 
     950            2 :     #[test]
     951            2 :     fn ranged_search() {
     952            2 :         let layers = vec![
     953            2 :             LayerDesc {
     954            2 :                 key_range: Key::from_i128(15)..Key::from_i128(50),
     955            2 :                 lsn_range: Lsn(0)..Lsn(5),
     956            2 :                 is_delta: false,
     957            2 :             },
     958            2 :             LayerDesc {
     959            2 :                 key_range: Key::from_i128(10)..Key::from_i128(20),
     960            2 :                 lsn_range: Lsn(5)..Lsn(20),
     961            2 :                 is_delta: true,
     962            2 :             },
     963            2 :             LayerDesc {
     964            2 :                 key_range: Key::from_i128(15)..Key::from_i128(25),
     965            2 :                 lsn_range: Lsn(20)..Lsn(30),
     966            2 :                 is_delta: true,
     967            2 :             },
     968            2 :             LayerDesc {
     969            2 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
     970            2 :                 lsn_range: Lsn(25)..Lsn(35),
     971            2 :                 is_delta: true,
     972            2 :             },
     973            2 :             LayerDesc {
     974            2 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
     975            2 :                 lsn_range: Lsn(35)..Lsn(40),
     976            2 :                 is_delta: false,
     977            2 :             },
     978            2 :         ];
     979            2 : 
     980            2 :         let layer_map = create_layer_map(layers.clone());
     981          122 :         for start in 0..60 {
     982         3540 :             for end in (start + 1)..60 {
     983         3540 :                 let range = Key::from_i128(start)..Key::from_i128(end);
     984         3540 :                 let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap();
     985         3540 :                 let expected = brute_force_range_search(&layer_map, range, Lsn(100));
     986         3540 : 
     987         3540 :                 assert_range_search_result_eq(result, expected);
     988         3540 :             }
     989              :         }
     990            2 :     }
     991              : }
        

Generated by: LCOV version 2.1-beta