LCOV - code coverage report
Current view: top level - pageserver/src/tenant - layer_map.rs (source / functions) Coverage Total Hit
Test: 02e8c57acd6e2b986849f552ca30280d54699b79.info Lines: 73.1 % 568 415
Test Date: 2024-06-26 17:13:54 Functions: 80.4 % 51 41

            Line data    Source code
       1              : //!
       2              : //! The layer map tracks what layers exist in a timeline.
       3              : //!
       4              : //! When the timeline is first accessed, the server lists of all layer files
       5              : //! in the timelines/<timeline_id> directory, and populates this map with
       6              : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
       7              : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
       8              : //! records. Now and then, in the checkpoint() function, the in-memory layer is
       9              : //! are frozen, and it is split up into new image and delta layers and the
      10              : //! corresponding files are written to disk.
      11              : //!
      12              : //! Design overview:
      13              : //!
      14              : //! The `search` method of the layer map is on the read critical path, so we've
      15              : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
      16              : //! Other read methods are less critical but still impact performance of background tasks.
      17              : //!
      18              : //! This data structure relies on a persistent/immutable binary search tree. See the
      19              : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
      20              : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
      21              : //! you to modify the tree in such a way that each modification creates a new "version"
      22              : //! of the tree. When you modify it, you get a new version, but all previous versions are
      23              : //! still accessible too. So if someone is still holding a reference to an older version,
      24              : //! they continue to see the tree as it was then. The persistent BST stores all the
      25              : //! different versions in an efficient way.
      26              : //!
      27              : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
      28              : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
      29              : //! to handle the LSN dimension.
      30              : //!
      31              : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
      32              : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
      33              : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
      34              : //! `historic_layer_coverage.rs`.
      35              : //!
      36              : //! To search for a particular key-LSN pair, you first look up the right "version" in the
      37              : //! BTreeMap. Then you search that version of the BST with the key.
      38              : //!
      39              : //! The persistent BST keeps all the versions, but there is no way to change the old versions
      40              : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
      41              : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
      42              : //! to throw away most of the persistent BST and build a new one, starting from the oldest
      43              : //! LSN. See [`LayerMap::flush_updates()`].
      44              : //!
      45              : 
      46              : mod historic_layer_coverage;
      47              : mod layer_coverage;
      48              : 
      49              : use crate::context::RequestContext;
      50              : use crate::keyspace::KeyPartitioning;
      51              : use crate::repository::Key;
      52              : use crate::tenant::storage_layer::InMemoryLayer;
      53              : use anyhow::Result;
      54              : use pageserver_api::keyspace::KeySpaceAccum;
      55              : use std::collections::{HashMap, VecDeque};
      56              : use std::iter::Peekable;
      57              : use std::ops::Range;
      58              : use std::sync::Arc;
      59              : use utils::lsn::Lsn;
      60              : 
      61              : use historic_layer_coverage::BufferedHistoricLayerCoverage;
      62              : pub use historic_layer_coverage::LayerKey;
      63              : 
      64              : use super::storage_layer::PersistentLayerDesc;
      65              : 
      66              : ///
      67              : /// LayerMap tracks what layers exist on a timeline.
      68              : ///
      69              : #[derive(Default)]
      70              : pub struct LayerMap {
      71              :     //
      72              :     // 'open_layer' holds the current InMemoryLayer that is accepting new
      73              :     // records. If it is None, 'next_open_layer_at' will be set instead, indicating
      74              :     // where the start LSN of the next InMemoryLayer that is to be created.
      75              :     //
      76              :     pub open_layer: Option<Arc<InMemoryLayer>>,
      77              :     pub next_open_layer_at: Option<Lsn>,
      78              : 
      79              :     ///
      80              :     /// Frozen layers, if any. Frozen layers are in-memory layers that
      81              :     /// are no longer added to, but haven't been written out to disk
      82              :     /// yet. They contain WAL older than the current 'open_layer' or
      83              :     /// 'next_open_layer_at', but newer than any historic layer.
      84              :     /// The frozen layers are in order from oldest to newest, so that
      85              :     /// the newest one is in the 'back' of the VecDeque, and the oldest
      86              :     /// in the 'front'.
      87              :     ///
      88              :     pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
      89              : 
      90              :     /// Index of the historic layers optimized for search
      91              :     historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
      92              : 
      93              :     /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
      94              :     /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
      95              :     l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
      96              : }
      97              : 
      98              : /// The primary update API for the layer map.
      99              : ///
     100              : /// Batching historic layer insertions and removals is good for
     101              : /// performance and this struct helps us do that correctly.
     102              : #[must_use]
     103              : pub struct BatchedUpdates<'a> {
     104              :     // While we hold this exclusive reference to the layer map the type checker
     105              :     // will prevent us from accidentally reading any unflushed updates.
     106              :     layer_map: &'a mut LayerMap,
     107              : }
     108              : 
     109              : /// Provide ability to batch more updates while hiding the read
     110              : /// API so we don't accidentally read without flushing.
     111              : impl BatchedUpdates<'_> {
     112              :     ///
     113              :     /// Insert an on-disk layer.
     114              :     ///
     115              :     // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
     116         1557 :     pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
     117         1557 :         self.layer_map.insert_historic_noflush(layer_desc)
     118         1557 :     }
     119              : 
     120              :     ///
     121              :     /// Remove an on-disk layer from the map.
     122              :     ///
     123              :     /// This should be called when the corresponding file on disk has been deleted.
     124              :     ///
     125          434 :     pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
     126          434 :         self.layer_map.remove_historic_noflush(layer_desc)
     127          434 :     }
     128              : 
     129              :     // We will flush on drop anyway, but this method makes it
     130              :     // more explicit that there is some work being done.
     131              :     /// Apply all updates
     132         1599 :     pub fn flush(self) {
     133         1599 :         // Flush happens on drop
     134         1599 :     }
     135              : }
     136              : 
     137              : // Ideally the flush() method should be called explicitly for more
     138              : // controlled execution. But if we forget we'd rather flush on drop
     139              : // than panic later or read without flushing.
     140              : //
     141              : // TODO maybe warn if flush hasn't explicitly been called
     142              : impl Drop for BatchedUpdates<'_> {
     143         1599 :     fn drop(&mut self) {
     144         1599 :         self.layer_map.flush_updates();
     145         1599 :     }
     146              : }
     147              : 
     148              : /// Return value of LayerMap::search
     149              : #[derive(Eq, PartialEq, Debug, Hash)]
     150              : pub struct SearchResult {
     151              :     pub layer: Arc<PersistentLayerDesc>,
     152              :     pub lsn_floor: Lsn,
     153              : }
     154              : 
     155              : /// Return value of [`LayerMap::range_search`]
     156              : ///
     157              : /// Contains a mapping from a layer description to a keyspace
     158              : /// accumulator that contains all the keys which intersect the layer
     159              : /// from the original search space. Keys that were not found are accumulated
     160              : /// in a separate key space accumulator.
     161              : #[derive(Debug)]
     162              : pub struct RangeSearchResult {
     163              :     pub found: HashMap<SearchResult, KeySpaceAccum>,
     164              :     pub not_found: KeySpaceAccum,
     165              : }
     166              : 
     167              : impl RangeSearchResult {
     168        79173 :     fn new() -> Self {
     169        79173 :         Self {
     170        79173 :             found: HashMap::new(),
     171        79173 :             not_found: KeySpaceAccum::new(),
     172        79173 :         }
     173        79173 :     }
     174              : }
     175              : 
     176              : /// Collector for results of range search queries on the LayerMap.
     177              : /// It should be provided with two iterators for the delta and image coverage
     178              : /// that contain all the changes for layers which intersect the range.
     179              : struct RangeSearchCollector<Iter>
     180              : where
     181              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     182              : {
     183              :     delta_coverage: Peekable<Iter>,
     184              :     image_coverage: Peekable<Iter>,
     185              :     key_range: Range<Key>,
     186              :     end_lsn: Lsn,
     187              : 
     188              :     current_delta: Option<Arc<PersistentLayerDesc>>,
     189              :     current_image: Option<Arc<PersistentLayerDesc>>,
     190              : 
     191              :     result: RangeSearchResult,
     192              : }
     193              : 
     194              : #[derive(Debug)]
     195              : enum NextLayerType {
     196              :     Delta(i128),
     197              :     Image(i128),
     198              :     Both(i128),
     199              : }
     200              : 
     201              : impl NextLayerType {
     202       141886 :     fn next_change_at_key(&self) -> Key {
     203       141886 :         match self {
     204        32292 :             NextLayerType::Delta(at) => Key::from_i128(*at),
     205         2568 :             NextLayerType::Image(at) => Key::from_i128(*at),
     206       107026 :             NextLayerType::Both(at) => Key::from_i128(*at),
     207              :         }
     208       141886 :     }
     209              : }
     210              : 
     211              : impl<Iter> RangeSearchCollector<Iter>
     212              : where
     213              :     Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
     214              : {
     215        75595 :     fn new(
     216        75595 :         key_range: Range<Key>,
     217        75595 :         end_lsn: Lsn,
     218        75595 :         delta_coverage: Iter,
     219        75595 :         image_coverage: Iter,
     220        75595 :     ) -> Self {
     221        75595 :         Self {
     222        75595 :             delta_coverage: delta_coverage.peekable(),
     223        75595 :             image_coverage: image_coverage.peekable(),
     224        75595 :             key_range,
     225        75595 :             end_lsn,
     226        75595 :             current_delta: None,
     227        75595 :             current_image: None,
     228        75595 :             result: RangeSearchResult::new(),
     229        75595 :         }
     230        75595 :     }
     231              : 
     232              :     /// Run the collector. Collection is implemented via a two pointer algorithm.
     233              :     /// One pointer tracks the start of the current range and the other tracks
     234              :     /// the beginning of the next range which will overlap with the next change
     235              :     /// in coverage across both image and delta.
     236        75595 :     fn collect(mut self) -> RangeSearchResult {
     237        75595 :         let next_layer_type = self.choose_next_layer_type();
     238        67365 :         let mut current_range_start = match next_layer_type {
     239              :             None => {
     240              :                 // No changes for the range
     241         8230 :                 self.pad_range(self.key_range.clone());
     242         8230 :                 return self.result;
     243              :             }
     244        67365 :             Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
     245            0 :                 // Changes only after the end of the range
     246            0 :                 self.pad_range(self.key_range.clone());
     247            0 :                 return self.result;
     248              :             }
     249        67365 :             Some(layer_type) => {
     250        67365 :                 // Changes for the range exist. Record anything before the first
     251        67365 :                 // coverage change as not found.
     252        67365 :                 let coverage_start = layer_type.next_change_at_key();
     253        67365 :                 let range_before = self.key_range.start..coverage_start;
     254        67365 :                 self.pad_range(range_before);
     255        67365 : 
     256        67365 :                 self.advance(&layer_type);
     257        67365 :                 coverage_start
     258              :             }
     259              :         };
     260              : 
     261       141886 :         while current_range_start < self.key_range.end {
     262        74521 :             let next_layer_type = self.choose_next_layer_type();
     263        74521 :             match next_layer_type {
     264         7156 :                 Some(t) => {
     265         7156 :                     let current_range_end = t.next_change_at_key();
     266         7156 :                     self.add_range(current_range_start..current_range_end);
     267         7156 :                     current_range_start = current_range_end;
     268         7156 : 
     269         7156 :                     self.advance(&t);
     270         7156 :                 }
     271        67365 :                 None => {
     272        67365 :                     self.add_range(current_range_start..self.key_range.end);
     273        67365 :                     current_range_start = self.key_range.end;
     274        67365 :                 }
     275              :             }
     276              :         }
     277              : 
     278        67365 :         self.result
     279        75595 :     }
     280              : 
     281              :     /// Mark a range as not found (i.e. no layers intersect it)
     282        76527 :     fn pad_range(&mut self, key_range: Range<Key>) {
     283        76527 :         if !key_range.is_empty() {
     284        10146 :             self.result.not_found.add_range(key_range);
     285        66381 :         }
     286        76527 :     }
     287              : 
     288              :     /// Select the appropiate layer for the given range and update
     289              :     /// the collector.
     290        74521 :     fn add_range(&mut self, covered_range: Range<Key>) {
     291        74521 :         let selected = LayerMap::select_layer(
     292        74521 :             self.current_delta.clone(),
     293        74521 :             self.current_image.clone(),
     294        74521 :             self.end_lsn,
     295        74521 :         );
     296        74521 : 
     297        74521 :         match selected {
     298        73589 :             Some(search_result) => self
     299        73589 :                 .result
     300        73589 :                 .found
     301        73589 :                 .entry(search_result)
     302        73589 :                 .or_default()
     303        73589 :                 .add_range(covered_range),
     304          932 :             None => self.pad_range(covered_range),
     305              :         }
     306        74521 :     }
     307              : 
     308              :     /// Move to the next coverage change.
     309        74521 :     fn advance(&mut self, layer_type: &NextLayerType) {
     310        74521 :         match layer_type {
     311        17004 :             NextLayerType::Delta(_) => {
     312        17004 :                 let (_, layer) = self.delta_coverage.next().unwrap();
     313        17004 :                 self.current_delta = layer;
     314        17004 :             }
     315         1742 :             NextLayerType::Image(_) => {
     316         1742 :                 let (_, layer) = self.image_coverage.next().unwrap();
     317         1742 :                 self.current_image = layer;
     318         1742 :             }
     319        55775 :             NextLayerType::Both(_) => {
     320        55775 :                 let (_, image_layer) = self.image_coverage.next().unwrap();
     321        55775 :                 let (_, delta_layer) = self.delta_coverage.next().unwrap();
     322        55775 : 
     323        55775 :                 self.current_image = image_layer;
     324        55775 :                 self.current_delta = delta_layer;
     325        55775 :             }
     326              :         }
     327        74521 :     }
     328              : 
     329              :     /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
     330       150116 :     fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
     331       150116 :         let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
     332       150116 :         let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
     333       150116 : 
     334       150116 :         match (next_delta_at, next_image_at) {
     335        75595 :             (None, None) => None,
     336        14482 :             (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
     337         1302 :             (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
     338        58737 :             (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
     339          440 :                 Some(NextLayerType::Image(*next_image_at))
     340              :             }
     341        58297 :             (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
     342         2522 :                 Some(NextLayerType::Delta(*next_delta_at))
     343              :             }
     344        55775 :             (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
     345              :         }
     346       150116 :     }
     347              : }
     348              : 
     349              : impl LayerMap {
     350              :     ///
     351              :     /// Find the latest layer (by lsn.end) that covers the given
     352              :     /// 'key', with lsn.start < 'end_lsn'.
     353              :     ///
     354              :     /// The caller of this function is the page reconstruction
     355              :     /// algorithm looking for the next relevant delta layer, or
     356              :     /// the terminal image layer. The caller will pass the lsn_floor
     357              :     /// value as end_lsn in the next call to search.
     358              :     ///
     359              :     /// If there's an image layer exactly below the given end_lsn,
     360              :     /// search should return that layer regardless if there are
     361              :     /// overlapping deltas.
     362              :     ///
     363              :     /// If the latest layer is a delta and there is an overlapping
     364              :     /// image with it below, the lsn_floor returned should be right
     365              :     /// above that image so we don't skip it in the search. Otherwise
     366              :     /// the lsn_floor returned should be the bottom of the delta layer
     367              :     /// because we should make as much progress down the lsn axis
     368              :     /// as possible. It's fine if this way we skip some overlapping
     369              :     /// deltas, because the delta we returned would contain the same
     370              :     /// wal content.
     371              :     ///
     372              :     /// TODO: This API is convoluted and inefficient. If the caller
     373              :     /// makes N search calls, we'll end up finding the same latest
     374              :     /// image layer N times. We should either cache the latest image
     375              :     /// layer result, or simplify the api to `get_latest_image` and
     376              :     /// `get_latest_delta`, and only call `get_latest_image` once.
     377              :     ///
     378              :     /// NOTE: This only searches the 'historic' layers, *not* the
     379              :     /// 'open' and 'frozen' layers!
     380              :     ///
     381       283725 :     pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
     382       283725 :         let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
     383       283609 :         let latest_delta = version.delta_coverage.query(key.to_i128());
     384       283609 :         let latest_image = version.image_coverage.query(key.to_i128());
     385       283609 : 
     386       283609 :         Self::select_layer(latest_delta, latest_image, end_lsn)
     387       283725 :     }
     388              : 
     389       358130 :     fn select_layer(
     390       358130 :         delta_layer: Option<Arc<PersistentLayerDesc>>,
     391       358130 :         image_layer: Option<Arc<PersistentLayerDesc>>,
     392       358130 :         end_lsn: Lsn,
     393       358130 :     ) -> Option<SearchResult> {
     394       358130 :         assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
     395       358130 :         assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
     396              : 
     397       358130 :         match (delta_layer, image_layer) {
     398        11600 :             (None, None) => None,
     399        35788 :             (None, Some(image)) => {
     400        35788 :                 let lsn_floor = image.get_lsn_range().start;
     401        35788 :                 Some(SearchResult {
     402        35788 :                     layer: image,
     403        35788 :                     lsn_floor,
     404        35788 :                 })
     405              :             }
     406        91230 :             (Some(delta), None) => {
     407        91230 :                 let lsn_floor = delta.get_lsn_range().start;
     408        91230 :                 Some(SearchResult {
     409        91230 :                     layer: delta,
     410        91230 :                     lsn_floor,
     411        91230 :                 })
     412              :             }
     413       219512 :             (Some(delta), Some(image)) => {
     414       219512 :                 let img_lsn = image.get_lsn_range().start;
     415       219512 :                 let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
     416       219512 :                 let image_exact_match = img_lsn + 1 == end_lsn;
     417       219512 :                 if image_is_newer || image_exact_match {
     418        38238 :                     Some(SearchResult {
     419        38238 :                         layer: image,
     420        38238 :                         lsn_floor: img_lsn,
     421        38238 :                     })
     422              :                 } else {
     423       181274 :                     let lsn_floor =
     424       181274 :                         std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
     425       181274 :                     Some(SearchResult {
     426       181274 :                         layer: delta,
     427       181274 :                         lsn_floor,
     428       181274 :                     })
     429              :                 }
     430              :             }
     431              :         }
     432       358130 :     }
     433              : 
     434        75633 :     pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
     435        75633 :         let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
     436        75595 :             Some(version) => version,
     437              :             None => {
     438           38 :                 let mut result = RangeSearchResult::new();
     439           38 :                 result.not_found.add_range(key_range);
     440           38 :                 return result;
     441              :             }
     442              :         };
     443              : 
     444        75595 :         let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
     445        75595 :         let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
     446        75595 :         let image_changes = version.image_coverage.range_overlaps(&raw_range);
     447        75595 : 
     448        75595 :         let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
     449        75595 :         collector.collect()
     450        75633 :     }
     451              : 
     452              :     /// Start a batch of updates, applied on drop
     453         1599 :     pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
     454         1599 :         BatchedUpdates { layer_map: self }
     455         1599 :     }
     456              : 
     457              :     ///
     458              :     /// Insert an on-disk layer
     459              :     ///
     460              :     /// Helper function for BatchedUpdates::insert_historic
     461              :     ///
     462              :     /// TODO(chi): remove L generic so that we do not need to pass layer object.
     463         1567 :     pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
     464         1567 :         // TODO: See #3869, resulting #4088, attempted fix and repro #4094
     465         1567 : 
     466         1567 :         if Self::is_l0(&layer_desc) {
     467          980 :             self.l0_delta_layers.push(layer_desc.clone().into());
     468          980 :         }
     469              : 
     470         1567 :         self.historic.insert(
     471         1567 :             historic_layer_coverage::LayerKey::from(&layer_desc),
     472         1567 :             layer_desc.into(),
     473         1567 :         );
     474         1567 :     }
     475              : 
     476              :     ///
     477              :     /// Remove an on-disk layer from the map.
     478              :     ///
     479              :     /// Helper function for BatchedUpdates::remove_historic
     480              :     ///
     481          434 :     pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
     482          434 :         self.historic
     483          434 :             .remove(historic_layer_coverage::LayerKey::from(layer_desc));
     484          434 :         let layer_key = layer_desc.key();
     485          434 :         if Self::is_l0(layer_desc) {
     486          404 :             let len_before = self.l0_delta_layers.len();
     487          404 :             let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
     488         5524 :             l0_delta_layers.retain(|other| other.key() != layer_key);
     489          404 :             self.l0_delta_layers = l0_delta_layers;
     490          404 :             // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
     491          404 :             // there's a chance that the comparison fails at runtime due to it comparing (pointer,
     492          404 :             // vtable) pairs.
     493          404 :             assert_eq!(
     494          404 :                 self.l0_delta_layers.len(),
     495          404 :                 len_before - 1,
     496            0 :                 "failed to locate removed historic layer from l0_delta_layers"
     497              :             );
     498           30 :         }
     499          434 :     }
     500              : 
     501              :     /// Helper function for BatchedUpdates::drop.
     502         1601 :     pub(self) fn flush_updates(&mut self) {
     503         1601 :         self.historic.rebuild();
     504         1601 :     }
     505              : 
     506              :     /// Is there a newer image layer for given key- and LSN-range? Or a set
     507              :     /// of image layers within the specified lsn range that cover the entire
     508              :     /// specified key range?
     509              :     ///
     510              :     /// This is used for garbage collection, to determine if an old layer can
     511              :     /// be deleted.
     512        11650 :     pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
     513        11650 :         if key.is_empty() {
     514              :             // Vacuously true. There's a newer image for all 0 of the kerys in the range.
     515            0 :             return true;
     516        11650 :         }
     517              : 
     518        11650 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     519        11650 :             Some(v) => v,
     520            0 :             None => return false,
     521              :         };
     522              : 
     523        11650 :         let start = key.start.to_i128();
     524        11650 :         let end = key.end.to_i128();
     525        11650 : 
     526        11748 :         let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
     527        11718 :             Some(layer) => layer.get_lsn_range().start >= lsn.start,
     528           30 :             None => false,
     529        11748 :         };
     530              : 
     531              :         // Check the start is covered
     532        11650 :         if !layer_covers(version.image_coverage.query(start)) {
     533        11612 :             return false;
     534           38 :         }
     535              : 
     536              :         // Check after all changes of coverage
     537           98 :         for (_, change_val) in version.image_coverage.range(start..end) {
     538           98 :             if !layer_covers(change_val) {
     539           30 :                 return false;
     540           68 :             }
     541              :         }
     542              : 
     543            8 :         true
     544        11650 :     }
     545              : 
     546          773 :     pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
     547          773 :         self.historic.iter()
     548          773 :     }
     549              : 
     550              :     /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
     551          288 :     pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<Arc<InMemoryLayer>>
     552          288 :     where
     553          288 :         Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
     554          288 :     {
     555          288 :         if let Some(open) = &self.open_layer {
     556           28 :             if pred(open) {
     557           14 :                 return Some(open.clone());
     558           14 :             }
     559          260 :         }
     560              : 
     561          274 :         self.frozen_layers.iter().rfind(|l| pred(l)).cloned()
     562          288 :     }
     563              : 
     564              :     ///
     565              :     /// Divide the whole given range of keys into sub-ranges based on the latest
     566              :     /// image layer that covers each range at the specified lsn (inclusive).
     567              :     /// This is used when creating  new image layers.
     568           14 :     pub fn image_coverage(
     569           14 :         &self,
     570           14 :         key_range: &Range<Key>,
     571           14 :         lsn: Lsn,
     572           14 :     ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
     573           14 :         let version = match self.historic.get().unwrap().get_version(lsn.0) {
     574           14 :             Some(v) => v,
     575            0 :             None => return vec![],
     576              :         };
     577              : 
     578           14 :         let start = key_range.start.to_i128();
     579           14 :         let end = key_range.end.to_i128();
     580           14 : 
     581           14 :         // Initialize loop variables
     582           14 :         let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
     583           14 :         let mut current_key = start;
     584           14 :         let mut current_val = version.image_coverage.query(start);
     585              : 
     586              :         // Loop through the change events and push intervals
     587           14 :         for (change_key, change_val) in version.image_coverage.range(start..end) {
     588            0 :             let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     589            0 :             coverage.push((kr, current_val.take()));
     590            0 :             current_key = change_key;
     591            0 :             current_val.clone_from(&change_val);
     592            0 :         }
     593              : 
     594              :         // Add the final interval
     595           14 :         let kr = Key::from_i128(current_key)..Key::from_i128(end);
     596           14 :         coverage.push((kr, current_val.take()));
     597           14 : 
     598           14 :         coverage
     599           14 :     }
     600              : 
     601         2309 :     pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
     602         2309 :         layer.get_key_range() == (Key::MIN..Key::MAX)
     603         2309 :     }
     604              : 
     605              :     /// This function determines which layers are counted in `count_deltas`:
     606              :     /// layers that should count towards deciding whether or not to reimage
     607              :     /// a certain partition range.
     608              :     ///
     609              :     /// There are two kinds of layers we currently consider reimage-worthy:
     610              :     ///
     611              :     /// Case 1: Non-L0 layers are currently reimage-worthy by default.
     612              :     /// TODO Some of these layers are very sparse and cover the entire key
     613              :     ///      range. Replacing 256MB of data (or less!) with terabytes of
     614              :     ///      images doesn't seem wise. We need a better heuristic, possibly
     615              :     ///      based on some of these factors:
     616              :     ///      a) whether this layer has any wal in this partition range
     617              :     ///      b) the size of the layer
     618              :     ///      c) the number of images needed to cover it
     619              :     ///      d) the estimated time until we'll have to reimage over it for GC
     620              :     ///
     621              :     /// Case 2: Since L0 layers by definition cover the entire key space, we consider
     622              :     /// them reimage-worthy only when the entire key space can be covered by very few
     623              :     /// images (currently 1).
     624              :     /// TODO The optimal number should probably be slightly higher than 1, but to
     625              :     ///      implement that we need to plumb a lot more context into this function
     626              :     ///      than just the current partition_range.
     627            0 :     pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
     628            0 :         // Case 1
     629            0 :         if !Self::is_l0(layer) {
     630            0 :             return true;
     631            0 :         }
     632            0 : 
     633            0 :         // Case 2
     634            0 :         if partition_range == &(Key::MIN..Key::MAX) {
     635            0 :             return true;
     636            0 :         }
     637            0 : 
     638            0 :         false
     639            0 :     }
     640              : 
     641              :     /// Count the height of the tallest stack of reimage-worthy deltas
     642              :     /// in this 2d region.
     643              :     ///
     644              :     /// If `limit` is provided we don't try to count above that number.
     645              :     ///
     646              :     /// This number is used to compute the largest number of deltas that
     647              :     /// we'll need to visit for any page reconstruction in this region.
     648              :     /// We use this heuristic to decide whether to create an image layer.
     649           14 :     pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
     650           14 :         // We get the delta coverage of the region, and for each part of the coverage
     651           14 :         // we recurse right underneath the delta. The recursion depth is limited by
     652           14 :         // the largest result this function could return, which is in practice between
     653           14 :         // 3 and 10 (since we usually try to create an image when the number gets larger).
     654           14 : 
     655           14 :         if lsn.is_empty() || key.is_empty() || limit == Some(0) {
     656            0 :             return 0;
     657           14 :         }
     658              : 
     659           14 :         let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
     660           14 :             Some(v) => v,
     661            0 :             None => return 0,
     662              :         };
     663              : 
     664           14 :         let start = key.start.to_i128();
     665           14 :         let end = key.end.to_i128();
     666           14 : 
     667           14 :         // Initialize loop variables
     668           14 :         let mut max_stacked_deltas = 0;
     669           14 :         let mut current_key = start;
     670           14 :         let mut current_val = version.delta_coverage.query(start);
     671              : 
     672              :         // Loop through the delta coverage and recurse on each part
     673           14 :         for (change_key, change_val) in version.delta_coverage.range(start..end) {
     674              :             // If there's a relevant delta in this part, add 1 and recurse down
     675            0 :             if let Some(val) = &current_val {
     676            0 :                 if val.get_lsn_range().end > lsn.start {
     677            0 :                     let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
     678            0 :                     let lr = lsn.start..val.get_lsn_range().start;
     679            0 :                     if !kr.is_empty() {
     680            0 :                         let base_count = Self::is_reimage_worthy(val, key) as usize;
     681            0 :                         let new_limit = limit.map(|l| l - base_count);
     682            0 :                         let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     683            0 :                         max_stacked_deltas = std::cmp::max(
     684            0 :                             max_stacked_deltas,
     685            0 :                             base_count + max_stacked_deltas_underneath,
     686            0 :                         );
     687            0 :                     }
     688            0 :                 }
     689            0 :             }
     690              : 
     691            0 :             current_key = change_key;
     692            0 :             current_val.clone_from(&change_val);
     693              :         }
     694              : 
     695              :         // Consider the last part
     696           14 :         if let Some(val) = &current_val {
     697            0 :             if val.get_lsn_range().end > lsn.start {
     698            0 :                 let kr = Key::from_i128(current_key)..Key::from_i128(end);
     699            0 :                 let lr = lsn.start..val.get_lsn_range().start;
     700            0 : 
     701            0 :                 if !kr.is_empty() {
     702            0 :                     let base_count = Self::is_reimage_worthy(val, key) as usize;
     703            0 :                     let new_limit = limit.map(|l| l - base_count);
     704            0 :                     let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
     705            0 :                     max_stacked_deltas = std::cmp::max(
     706            0 :                         max_stacked_deltas,
     707            0 :                         base_count + max_stacked_deltas_underneath,
     708            0 :                     );
     709            0 :                 }
     710            0 :             }
     711           14 :         }
     712              : 
     713           14 :         max_stacked_deltas
     714           14 :     }
     715              : 
     716              :     /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
     717              :     ///
     718              :     /// The `partition_range` argument is used as context for the reimage-worthiness decision.
     719              :     ///
     720              :     /// Used as a helper for correctness checks only. Performance not critical.
     721            0 :     pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
     722            0 :         match self.search(key, lsn) {
     723            0 :             Some(search_result) => {
     724            0 :                 if search_result.layer.is_incremental() {
     725            0 :                     (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
     726            0 :                         + self.get_difficulty(search_result.lsn_floor, key, partition_range)
     727              :                 } else {
     728            0 :                     0
     729              :                 }
     730              :             }
     731            0 :             None => 0,
     732              :         }
     733            0 :     }
     734              : 
     735              :     /// Used for correctness checking. Results are expected to be identical to
     736              :     /// self.get_difficulty_map. Assumes self.search is correct.
     737            0 :     pub fn get_difficulty_map_bruteforce(
     738            0 :         &self,
     739            0 :         lsn: Lsn,
     740            0 :         partitioning: &KeyPartitioning,
     741            0 :     ) -> Vec<usize> {
     742            0 :         // Looking at the difficulty as a function of key, it could only increase
     743            0 :         // when a delta layer starts or an image layer ends. Therefore it's sufficient
     744            0 :         // to check the difficulties at:
     745            0 :         // - the key.start for each non-empty part range
     746            0 :         // - the key.start for each delta
     747            0 :         // - the key.end for each image
     748            0 :         let keys_iter: Box<dyn Iterator<Item = Key>> = {
     749            0 :             let mut keys: Vec<Key> = self
     750            0 :                 .iter_historic_layers()
     751            0 :                 .map(|layer| {
     752            0 :                     if layer.is_incremental() {
     753            0 :                         layer.get_key_range().start
     754              :                     } else {
     755            0 :                         layer.get_key_range().end
     756              :                     }
     757            0 :                 })
     758            0 :                 .collect();
     759            0 :             keys.sort();
     760            0 :             Box::new(keys.into_iter())
     761            0 :         };
     762            0 :         let mut keys_iter = keys_iter.peekable();
     763            0 : 
     764            0 :         // Iter the partition and keys together and query all the necessary
     765            0 :         // keys, computing the max difficulty for each part.
     766            0 :         partitioning
     767            0 :             .parts
     768            0 :             .iter()
     769            0 :             .map(|part| {
     770            0 :                 let mut difficulty = 0;
     771              :                 // Partition ranges are assumed to be sorted and disjoint
     772              :                 // TODO assert it
     773            0 :                 for range in &part.ranges {
     774            0 :                     if !range.is_empty() {
     775            0 :                         difficulty =
     776            0 :                             std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
     777            0 :                     }
     778            0 :                     while let Some(key) = keys_iter.peek() {
     779            0 :                         if key >= &range.end {
     780            0 :                             break;
     781            0 :                         }
     782            0 :                         let key = keys_iter.next().unwrap();
     783            0 :                         if key < range.start {
     784            0 :                             continue;
     785            0 :                         }
     786            0 :                         difficulty =
     787            0 :                             std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
     788              :                     }
     789              :                 }
     790            0 :                 difficulty
     791            0 :             })
     792            0 :             .collect()
     793            0 :     }
     794              : 
     795              :     /// For each part of a keyspace partitioning, return the maximum number of layers
     796              :     /// that would be needed for page reconstruction in that part at the given LSN.
     797              :     ///
     798              :     /// If `limit` is provided we don't try to count above that number.
     799              :     ///
     800              :     /// This method is used to decide where to create new image layers. Computing the
     801              :     /// result for the entire partitioning at once allows this function to be more
     802              :     /// efficient, and further optimization is possible by using iterators instead,
     803              :     /// to allow early return.
     804              :     ///
     805              :     /// TODO actually use this method instead of count_deltas. Currently we only use
     806              :     ///      it for benchmarks.
     807            0 :     pub fn get_difficulty_map(
     808            0 :         &self,
     809            0 :         lsn: Lsn,
     810            0 :         partitioning: &KeyPartitioning,
     811            0 :         limit: Option<usize>,
     812            0 :     ) -> Vec<usize> {
     813            0 :         // TODO This is a naive implementation. Perf improvements to do:
     814            0 :         // 1. Instead of calling self.image_coverage and self.count_deltas,
     815            0 :         //    iterate the image and delta coverage only once.
     816            0 :         partitioning
     817            0 :             .parts
     818            0 :             .iter()
     819            0 :             .map(|part| {
     820            0 :                 let mut difficulty = 0;
     821            0 :                 for range in &part.ranges {
     822            0 :                     if limit == Some(difficulty) {
     823            0 :                         break;
     824            0 :                     }
     825            0 :                     for (img_range, last_img) in self.image_coverage(range, lsn) {
     826            0 :                         if limit == Some(difficulty) {
     827            0 :                             break;
     828            0 :                         }
     829            0 :                         let img_lsn = if let Some(last_img) = last_img {
     830            0 :                             last_img.get_lsn_range().end
     831              :                         } else {
     832            0 :                             Lsn(0)
     833              :                         };
     834              : 
     835            0 :                         if img_lsn < lsn {
     836            0 :                             let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
     837            0 :                             difficulty = std::cmp::max(difficulty, num_deltas);
     838            0 :                         }
     839              :                     }
     840              :                 }
     841            0 :                 difficulty
     842            0 :             })
     843            0 :             .collect()
     844            0 :     }
     845              : 
     846              :     /// Return all L0 delta layers
     847          370 :     pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
     848          370 :         Ok(self.l0_delta_layers.to_vec())
     849          370 :     }
     850              : 
     851              :     /// debugging function to print out the contents of the layer map
     852              :     #[allow(unused)]
     853            2 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     854            2 :         println!("Begin dump LayerMap");
     855            2 : 
     856            2 :         println!("open_layer:");
     857            2 :         if let Some(open_layer) = &self.open_layer {
     858            0 :             open_layer.dump(verbose, ctx).await?;
     859            2 :         }
     860              : 
     861            2 :         println!("frozen_layers:");
     862            2 :         for frozen_layer in self.frozen_layers.iter() {
     863            0 :             frozen_layer.dump(verbose, ctx).await?;
     864              :         }
     865              : 
     866            2 :         println!("historic_layers:");
     867           12 :         for desc in self.iter_historic_layers() {
     868           12 :             desc.dump();
     869           12 :         }
     870            2 :         println!("End dump LayerMap");
     871            2 :         Ok(())
     872            2 :     }
     873              : }
     874              : 
     875              : #[cfg(test)]
     876              : mod tests {
     877              :     use pageserver_api::keyspace::KeySpace;
     878              : 
     879              :     use super::*;
     880              : 
     881              :     #[derive(Clone)]
     882              :     struct LayerDesc {
     883              :         key_range: Range<Key>,
     884              :         lsn_range: Range<Lsn>,
     885              :         is_delta: bool,
     886              :     }
     887              : 
     888            2 :     fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
     889            2 :         let mut layer_map = LayerMap::default();
     890              : 
     891           12 :         for layer in layers {
     892           10 :             layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
     893           10 :                 layer.key_range,
     894           10 :                 layer.lsn_range,
     895           10 :                 layer.is_delta,
     896           10 :             ));
     897           10 :         }
     898              : 
     899            2 :         layer_map.flush_updates();
     900            2 :         layer_map
     901            2 :     }
     902              : 
     903         3540 :     fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
     904         3540 :         assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
     905         3540 :         let lhs: HashMap<SearchResult, KeySpace> = lhs
     906         3540 :             .found
     907         3540 :             .into_iter()
     908         8230 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
     909         3540 :             .collect();
     910         3540 :         let rhs: HashMap<SearchResult, KeySpace> = rhs
     911         3540 :             .found
     912         3540 :             .into_iter()
     913         8230 :             .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
     914         3540 :             .collect();
     915         3540 : 
     916         3540 :         assert_eq!(lhs, rhs);
     917         3540 :     }
     918              : 
     919              :     #[cfg(test)]
     920         3540 :     fn brute_force_range_search(
     921         3540 :         layer_map: &LayerMap,
     922         3540 :         key_range: Range<Key>,
     923         3540 :         end_lsn: Lsn,
     924         3540 :     ) -> RangeSearchResult {
     925         3540 :         let mut range_search_result = RangeSearchResult::new();
     926         3540 : 
     927         3540 :         let mut key = key_range.start;
     928        75520 :         while key != key_range.end {
     929        71980 :             let res = layer_map.search(key, end_lsn);
     930        71980 :             match res {
     931        61320 :                 Some(res) => {
     932        61320 :                     range_search_result
     933        61320 :                         .found
     934        61320 :                         .entry(res)
     935        61320 :                         .or_default()
     936        61320 :                         .add_key(key);
     937        61320 :                 }
     938        10660 :                 None => {
     939        10660 :                     range_search_result.not_found.add_key(key);
     940        10660 :                 }
     941              :             }
     942              : 
     943        71980 :             key = key.next();
     944              :         }
     945              : 
     946         3540 :         range_search_result
     947         3540 :     }
     948              : 
     949              :     #[test]
     950            2 :     fn ranged_search_on_empty_layer_map() {
     951            2 :         let layer_map = LayerMap::default();
     952            2 :         let range = Key::from_i128(100)..Key::from_i128(200);
     953            2 : 
     954            2 :         let res = layer_map.range_search(range.clone(), Lsn(100));
     955            2 :         assert_eq!(
     956            2 :             res.not_found.to_keyspace(),
     957            2 :             KeySpace {
     958            2 :                 ranges: vec![range]
     959            2 :             }
     960            2 :         );
     961            2 :     }
     962              : 
     963              :     #[test]
     964            2 :     fn ranged_search() {
     965            2 :         let layers = vec![
     966            2 :             LayerDesc {
     967            2 :                 key_range: Key::from_i128(15)..Key::from_i128(50),
     968            2 :                 lsn_range: Lsn(0)..Lsn(5),
     969            2 :                 is_delta: false,
     970            2 :             },
     971            2 :             LayerDesc {
     972            2 :                 key_range: Key::from_i128(10)..Key::from_i128(20),
     973            2 :                 lsn_range: Lsn(5)..Lsn(20),
     974            2 :                 is_delta: true,
     975            2 :             },
     976            2 :             LayerDesc {
     977            2 :                 key_range: Key::from_i128(15)..Key::from_i128(25),
     978            2 :                 lsn_range: Lsn(20)..Lsn(30),
     979            2 :                 is_delta: true,
     980            2 :             },
     981            2 :             LayerDesc {
     982            2 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
     983            2 :                 lsn_range: Lsn(25)..Lsn(35),
     984            2 :                 is_delta: true,
     985            2 :             },
     986            2 :             LayerDesc {
     987            2 :                 key_range: Key::from_i128(35)..Key::from_i128(40),
     988            2 :                 lsn_range: Lsn(35)..Lsn(40),
     989            2 :                 is_delta: false,
     990            2 :             },
     991            2 :         ];
     992            2 : 
     993            2 :         let layer_map = create_layer_map(layers.clone());
     994          122 :         for start in 0..60 {
     995         3540 :             for end in (start + 1)..60 {
     996         3540 :                 let range = Key::from_i128(start)..Key::from_i128(end);
     997         3540 :                 let result = layer_map.range_search(range.clone(), Lsn(100));
     998         3540 :                 let expected = brute_force_range_search(&layer_map, range, Lsn(100));
     999         3540 : 
    1000         3540 :                 assert_range_search_result_eq(result, expected);
    1001         3540 :             }
    1002              :         }
    1003            2 :     }
    1004              : }
        

Generated by: LCOV version 2.1-beta