LCOV - code coverage report
Current view: top level - pageserver/src/tenant - disk_btree.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 97.0 % 638 619
Test Date: 2023-09-06 10:18:01 Functions: 80.5 % 215 173

            Line data    Source code
       1              : //!
       2              : //! Simple on-disk B-tree implementation
       3              : //!
       4              : //! This is used as the index structure within image and delta layers
       5              : //!
       6              : //! Features:
       7              : //! - Fixed-width keys
       8              : //! - Fixed-width values (VALUE_SZ)
       9              : //! - The tree is created in a bulk operation. Insert/deletion after creation
      10              : //!   is not supported
      11              : //! - page-oriented
      12              : //!
      13              : //! TODO:
      14              : //! - maybe something like an Adaptive Radix Tree would be more efficient?
      15              : //! - the values stored by image and delta layers are offsets into the file,
      16              : //!   and they are in monotonically increasing order. Prefix compression would
      17              : //!   be very useful for them, too.
      18              : //! - An Iterator interface would be more convenient for the callers than the
      19              : //!   'visit' function
      20              : //!
      21              : use byteorder::{ReadBytesExt, BE};
      22              : use bytes::{BufMut, Bytes, BytesMut};
      23              : use either::Either;
      24              : use hex;
      25              : use std::{cmp::Ordering, io, result};
      26              : use thiserror::Error;
      27              : use tracing::error;
      28              : 
      29              : use crate::tenant::block_io::{BlockReader, BlockWriter};
      30              : 
      31              : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
      32              : pub const VALUE_SZ: usize = 5;
      33              : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
      34              : 
      35              : #[allow(dead_code)]
      36              : pub const PAGE_SZ: usize = 8192;
      37              : 
      38            0 : #[derive(Clone, Copy, Debug)]
      39              : struct Value([u8; VALUE_SZ]);
      40              : 
      41              : impl Value {
      42    281076334 :     fn from_slice(slice: &[u8]) -> Value {
      43    281076334 :         let mut b = [0u8; VALUE_SZ];
      44    281076334 :         b.copy_from_slice(slice);
      45    281076334 :         Value(b)
      46    281076334 :     }
      47              : 
      48     91821560 :     fn from_u64(x: u64) -> Value {
      49     91821560 :         assert!(x <= 0x007f_ffff_ffff);
      50     91821560 :         Value([
      51     91821560 :             (x >> 32) as u8,
      52     91821560 :             (x >> 24) as u8,
      53     91821560 :             (x >> 16) as u8,
      54     91821560 :             (x >> 8) as u8,
      55     91821560 :             x as u8,
      56     91821560 :         ])
      57     91821560 :     }
      58              : 
      59       177124 :     fn from_blknum(x: u32) -> Value {
      60       177124 :         Value([
      61       177124 :             0x80,
      62       177124 :             (x >> 24) as u8,
      63       177124 :             (x >> 16) as u8,
      64       177124 :             (x >> 8) as u8,
      65       177124 :             x as u8,
      66       177124 :         ])
      67       177124 :     }
      68              : 
      69              :     #[allow(dead_code)]
      70            0 :     fn is_offset(self) -> bool {
      71            0 :         self.0[0] & 0x80 != 0
      72            0 :     }
      73              : 
      74    251205646 :     fn to_u64(self) -> u64 {
      75    251205646 :         let b = &self.0;
      76    251205646 :         (b[0] as u64) << 32
      77    251205646 :             | (b[1] as u64) << 24
      78    251205646 :             | (b[2] as u64) << 16
      79    251205646 :             | (b[3] as u64) << 8
      80    251205646 :             | b[4] as u64
      81    251205646 :     }
      82              : 
      83     29867676 :     fn to_blknum(self) -> u32 {
      84     29867676 :         let b = &self.0;
      85     29867676 :         assert!(b[0] == 0x80);
      86     29867676 :         (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
      87     29867676 :     }
      88              : }
      89              : 
      90            0 : #[derive(Error, Debug)]
      91              : pub enum DiskBtreeError {
      92              :     #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
      93              :     AppendOverflow(u64),
      94              : 
      95              :     #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
      96              :     UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
      97              : 
      98              :     #[error("Could not push to new leaf node")]
      99              :     FailedToPushToNewLeafNode,
     100              : 
     101              :     #[error("IoError: {0}")]
     102              :     Io(#[from] io::Error),
     103              : }
     104              : 
     105              : pub type Result<T> = result::Result<T, DiskBtreeError>;
     106              : 
     107              : /// This is the on-disk representation.
     108              : struct OnDiskNode<'a, const L: usize> {
     109              :     // Fixed-width fields
     110              :     num_children: u16,
     111              :     level: u8,
     112              :     prefix_len: u8,
     113              :     suffix_len: u8,
     114              : 
     115              :     // Variable-length fields. These are stored on-disk after the fixed-width
     116              :     // fields, in this order. In the in-memory representation, these point to
     117              :     // the right parts in the page buffer.
     118              :     prefix: &'a [u8],
     119              :     keys: &'a [u8],
     120              :     values: &'a [u8],
     121              : }
     122              : 
     123              : impl<'a, const L: usize> OnDiskNode<'a, L> {
     124              :     ///
     125              :     /// Interpret a PAGE_SZ page as a node.
     126              :     ///
     127     76221254 :     fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
     128     76221254 :         let mut cursor = std::io::Cursor::new(buf);
     129     76221254 :         let num_children = cursor.read_u16::<BE>()?;
     130     76221254 :         let level = cursor.read_u8()?;
     131     76221254 :         let prefix_len = cursor.read_u8()?;
     132     76221254 :         let suffix_len = cursor.read_u8()?;
     133              : 
     134     76221254 :         let mut off = cursor.position();
     135     76221254 :         let prefix_off = off as usize;
     136     76221254 :         off += prefix_len as u64;
     137     76221254 : 
     138     76221254 :         let keys_off = off as usize;
     139     76221254 :         let keys_len = num_children as usize * suffix_len as usize;
     140     76221254 :         off += keys_len as u64;
     141     76221254 : 
     142     76221254 :         let values_off = off as usize;
     143     76221254 :         let values_len = num_children as usize * VALUE_SZ;
     144     76221254 :         //off += values_len as u64;
     145     76221254 : 
     146     76221254 :         let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
     147     76221254 :         let keys = &buf[keys_off..keys_off + keys_len];
     148     76221254 :         let values = &buf[values_off..values_off + values_len];
     149     76221254 : 
     150     76221254 :         Ok(OnDiskNode {
     151     76221254 :             num_children,
     152     76221254 :             level,
     153     76221254 :             prefix_len,
     154     76221254 :             suffix_len,
     155     76221254 :             prefix,
     156     76221254 :             keys,
     157     76221254 :             values,
     158     76221254 :         })
     159     76221254 :     }
     160              : 
     161              :     ///
     162              :     /// Read a value at 'idx'
     163              :     ///
     164    281076760 :     fn value(&self, idx: usize) -> Value {
     165    281076760 :         let value_off = idx * VALUE_SZ;
     166    281076760 :         let value_slice = &self.values[value_off..value_off + VALUE_SZ];
     167    281076760 :         Value::from_slice(value_slice)
     168    281076760 :     }
     169              : 
     170     75773660 :     fn binary_search(
     171     75773660 :         &self,
     172     75773660 :         search_key: &[u8; L],
     173     75773660 :         keybuf: &mut [u8],
     174     75773660 :     ) -> result::Result<usize, usize> {
     175     75773660 :         let mut size = self.num_children as usize;
     176     75773660 :         let mut low = 0;
     177     75773660 :         let mut high = size;
     178    567082414 :         while low < high {
     179    492104295 :             let mid = low + size / 2;
     180    492104295 : 
     181    492104295 :             let key_off = mid * self.suffix_len as usize;
     182    492104295 :             let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
     183    492104295 :             // Does this match?
     184    492104295 :             keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
     185    492104295 : 
     186    492104295 :             let cmp = keybuf[..].cmp(search_key);
     187    492104295 : 
     188    492104295 :             if cmp == Ordering::Less {
     189    188113573 :                 low = mid + 1;
     190    303990722 :             } else if cmp == Ordering::Greater {
     191    303195181 :                 high = mid;
     192    303195181 :             } else {
     193       795541 :                 return Ok(mid);
     194              :             }
     195    491308754 :             size = high - low;
     196              :         }
     197     74978119 :         Err(low)
     198     75773660 :     }
     199              : }
     200              : 
     201              : ///
     202              : /// Public reader object, to search the tree.
     203              : ///
     204              : pub struct DiskBtreeReader<R, const L: usize>
     205              : where
     206              :     R: BlockReader,
     207              : {
     208              :     start_blk: u32,
     209              :     root_blk: u32,
     210              :     reader: R,
     211              : }
     212              : 
     213     75773588 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     214              : pub enum VisitDirection {
     215              :     Forwards,
     216              :     Backwards,
     217              : }
     218              : 
     219              : impl<R, const L: usize> DiskBtreeReader<R, L>
     220              : where
     221              :     R: BlockReader,
     222              : {
     223     45700352 :     pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
     224     45700352 :         DiskBtreeReader {
     225     45700352 :             start_blk,
     226     45700352 :             root_blk,
     227     45700352 :             reader,
     228     45700352 :         }
     229     45700352 :     }
     230              : 
     231              :     ///
     232              :     /// Read the value for given key. Returns the value, or None if it doesn't exist.
     233              :     ///
     234       860509 :     pub async fn get(&self, search_key: &[u8; L]) -> Result<Option<u64>> {
     235       860509 :         let mut result: Option<u64> = None;
     236       860509 :         self.visit(search_key, VisitDirection::Forwards, |key, value| {
     237       760497 :             if key == search_key {
     238       759492 :                 result = Some(value);
     239       759492 :             }
     240       760497 :             false
     241       860509 :         })
     242         2478 :         .await?;
     243       860509 :         Ok(result)
     244       860509 :     }
     245              : 
     246              :     ///
     247              :     /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
     248              :     /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
     249              :     /// backwards)
     250              :     ///
     251     45905965 :     pub async fn visit<V>(
     252     45905965 :         &self,
     253     45905965 :         search_key: &[u8; L],
     254     45905965 :         dir: VisitDirection,
     255     45905965 :         mut visitor: V,
     256     45905965 :     ) -> Result<bool>
     257     45905965 :     where
     258     45905965 :         V: FnMut(&[u8], u64) -> bool,
     259     45905965 :     {
     260     45905965 :         let mut stack = Vec::new();
     261     45905965 :         stack.push((self.root_blk, None));
     262     45905965 :         let block_cursor = self.reader.block_cursor();
     263     76333529 :         while let Some((node_blknum, opt_iter)) = stack.pop() {
     264              :             // Locate the node.
     265     76218238 :             let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
     266              : 
     267     76218238 :             let node = OnDiskNode::deparse(node_buf.as_ref())?;
     268     76218238 :             let prefix_len = node.prefix_len as usize;
     269     76218238 :             let suffix_len = node.suffix_len as usize;
     270     76218238 : 
     271     76218238 :             assert!(node.num_children > 0);
     272              : 
     273     76218238 :             let mut keybuf = Vec::new();
     274     76218238 :             keybuf.extend(node.prefix);
     275     76218238 :             keybuf.resize(prefix_len + suffix_len, 0);
     276              : 
     277     76218238 :             let mut iter = if let Some(iter) = opt_iter {
     278       444578 :                 iter
     279     75773660 :             } else if dir == VisitDirection::Forwards {
     280              :                 // Locate the first match
     281      1552239 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     282       761027 :                     Ok(idx) => idx,
     283       791212 :                     Err(idx) => {
     284       791212 :                         if node.level == 0 {
     285              :                             // Imagine that the node contains the following keys:
     286              :                             //
     287              :                             // 1
     288              :                             // 3  <-- idx
     289              :                             // 5
     290              :                             //
     291              :                             // If the search key is '2' and there is exact match,
     292              :                             // the binary search would return the index of key
     293              :                             // '3'. That's cool, '3' is the first key to return.
     294       189230 :                             idx
     295              :                         } else {
     296              :                             // This is an internal page, so each key represents a lower
     297              :                             // bound for what's in the child page. If there is no exact
     298              :                             // match, we have to return the *previous* entry.
     299              :                             //
     300              :                             // 1  <-- return this
     301              :                             // 3  <-- idx
     302              :                             // 5
     303       601982 :                             idx.saturating_sub(1)
     304              :                         }
     305              :                     }
     306              :                 };
     307      1552239 :                 Either::Left(idx..node.num_children.into())
     308              :             } else {
     309     74221421 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     310        34514 :                     Ok(idx) => {
     311        34514 :                         // Exact match. That's the first entry to return, and walk
     312        34514 :                         // backwards from there.
     313        34514 :                         idx
     314              :                     }
     315     74186907 :                     Err(idx) => {
     316              :                         // No exact match. The binary search returned the index of the
     317              :                         // first key that's > search_key. Back off by one, and walk
     318              :                         // backwards from there.
     319     74186907 :                         if let Some(idx) = idx.checked_sub(1) {
     320     58705990 :                             idx
     321              :                         } else {
     322     15480917 :                             return Ok(false);
     323              :                         }
     324              :                     }
     325              :                 };
     326     58740504 :                 Either::Right((0..=idx).rev())
     327              :             };
     328              : 
     329              :             // idx points to the first match now. Keep going from there
     330    281633613 :             while let Some(idx) = iter.next() {
     331    281073744 :                 let key_off = idx * suffix_len;
     332    281073744 :                 let suffix = &node.keys[key_off..key_off + suffix_len];
     333    281073744 :                 keybuf[prefix_len..].copy_from_slice(suffix);
     334    281073744 :                 let value = node.value(idx);
     335    281073744 :                 #[allow(clippy::collapsible_if)]
     336    281073744 :                 if node.level == 0 {
     337              :                     // leaf
     338    251206049 :                     if !visitor(&keybuf, value.to_u64()) {
     339     30309757 :                         return Ok(false);
     340    220896292 :                     }
     341              :                 } else {
     342     29867695 :                     stack.push((node_blknum, Some(iter)));
     343     29867695 :                     stack.push((value.to_blknum(), None));
     344     29867695 :                     break;
     345              :                 }
     346              :             }
     347              :         }
     348       115291 :         Ok(true)
     349     45905965 :     }
     350              : 
     351              :     #[allow(dead_code)]
     352            5 :     pub async fn dump(&self) -> Result<()> {
     353            5 :         let mut stack = Vec::new();
     354            5 : 
     355            5 :         stack.push((self.root_blk, String::new(), 0, 0, 0));
     356            5 : 
     357            5 :         let block_cursor = self.reader.block_cursor();
     358              : 
     359         3021 :         while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
     360         3016 :             let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
     361         3016 :             let buf: &[u8] = blk.as_ref();
     362         3016 :             let node = OnDiskNode::<L>::deparse(buf)?;
     363              : 
     364         3016 :             if child_idx == 0 {
     365            9 :                 print!("{:indent$}", "", indent = depth * 2);
     366            9 :                 let path_prefix = stack
     367            9 :                     .iter()
     368            9 :                     .map(|(_blknum, path, ..)| path.as_str())
     369            9 :                     .collect::<String>();
     370            9 :                 println!(
     371            9 :                     "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
     372            9 :                     hex::encode(node.prefix),
     373            9 :                     node.suffix_len
     374            9 :                 );
     375         3007 :             }
     376              : 
     377         3016 :             if child_idx + 1 < node.num_children {
     378         3007 :                 let key_off = key_off + node.suffix_len as usize;
     379         3007 :                 stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
     380         3007 :             }
     381         3016 :             let key = &node.keys[key_off..key_off + node.suffix_len as usize];
     382         3016 :             let val = node.value(child_idx as usize);
     383         3016 : 
     384         3016 :             print!("{:indent$}", "", indent = depth * 2 + 2);
     385         3016 :             println!("{}: {}", hex::encode(key), hex::encode(val.0));
     386         3016 : 
     387         3016 :             if node.level > 0 {
     388            4 :                 stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
     389         3012 :             }
     390              :         }
     391            5 :         Ok(())
     392            5 :     }
     393              : }
     394              : 
     395              : ///
     396              : /// Public builder object, for creating a new tree.
     397              : ///
     398              : /// Usage: Create a builder object by calling 'new', load all the data into the
     399              : /// tree by calling 'append' for each key-value pair, and then call 'finish'
     400              : ///
     401              : /// 'L' is the key length in bytes
     402              : pub struct DiskBtreeBuilder<W, const L: usize>
     403              : where
     404              :     W: BlockWriter,
     405              : {
     406              :     writer: W,
     407              : 
     408              :     ///
     409              :     /// `stack[0]` is the current root page, `stack.last()` is the leaf.
     410              :     ///
     411              :     /// We maintain the length of the stack to be always greater than zero.
     412              :     /// Two exceptions are:
     413              :     /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
     414              :     ///   So because other methods cannot see the intermediate state invariant still holds.
     415              :     /// 2. `Self::finish`. It consumes self and does not return it back,
     416              :     ///  which means that this is where the structure is destroyed.
     417              :     ///  Thus stack of zero length cannot be observed by other methods.
     418              :     stack: Vec<BuildNode<L>>,
     419              : 
     420              :     /// Last key that was appended to the tree. Used to sanity check that append
     421              :     /// is called in increasing key order.
     422              :     last_key: Option<[u8; L]>,
     423              : }
     424              : 
     425              : impl<W, const L: usize> DiskBtreeBuilder<W, L>
     426              : where
     427              :     W: BlockWriter,
     428              : {
     429        16632 :     pub fn new(writer: W) -> Self {
     430        16632 :         DiskBtreeBuilder {
     431        16632 :             writer,
     432        16632 :             last_key: None,
     433        16632 :             stack: vec![BuildNode::new(0)],
     434        16632 :         }
     435        16632 :     }
     436              : 
     437     91821788 :     pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
     438     91821788 :         if value > MAX_VALUE {
     439            0 :             return Err(DiskBtreeError::AppendOverflow(value));
     440     91821788 :         }
     441     91821788 :         if let Some(last_key) = &self.last_key {
     442     91805156 :             if key <= last_key {
     443            1 :                 return Err(DiskBtreeError::UnsortedInput {
     444            1 :                     key: key.as_slice().into(),
     445            1 :                     last_key: last_key.as_slice().into(),
     446            1 :                 });
     447     91805155 :             }
     448        16632 :         }
     449     91821787 :         self.last_key = Some(*key);
     450     91821787 : 
     451     91821787 :         self.append_internal(key, Value::from_u64(value))
     452     91821788 :     }
     453              : 
     454     91998911 :     fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
     455     91998911 :         // Try to append to the current leaf buffer
     456     91998911 :         let last = self
     457     91998911 :             .stack
     458     91998911 :             .last_mut()
     459     91998911 :             .expect("should always have at least one item");
     460     91998911 :         let level = last.level;
     461     91998911 :         if last.push(key, value) {
     462     91663221 :             return Ok(());
     463       335690 :         }
     464       335690 : 
     465       335690 :         // It did not fit. Try to compress, and if it succeeds to make
     466       335690 :         // some room on the node, try appending to it again.
     467       335690 :         #[allow(clippy::collapsible_if)]
     468       335690 :         if last.compress() {
     469       169291 :             if last.push(key, value) {
     470       169226 :                 return Ok(());
     471           65 :             }
     472       166399 :         }
     473              : 
     474              :         // Could not append to the current leaf. Flush it and create a new one.
     475       166464 :         self.flush_node()?;
     476              : 
     477              :         // Replace the node we flushed with an empty one and append the new
     478              :         // key to it.
     479       166464 :         let mut last = BuildNode::new(level);
     480       166464 :         if !last.push(key, value) {
     481            0 :             return Err(DiskBtreeError::FailedToPushToNewLeafNode);
     482       166464 :         }
     483       166464 : 
     484       166464 :         self.stack.push(last);
     485       166464 : 
     486       166464 :         Ok(())
     487     91998911 :     }
     488              : 
     489              :     /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
     490              :     /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
     491              :     /// creates a new root page, increasing the height of the tree.
     492       177124 :     fn flush_node(&mut self) -> Result<()> {
     493       177124 :         // Get the current bottommost node in the stack and flush it to disk.
     494       177124 :         let last = self
     495       177124 :             .stack
     496       177124 :             .pop()
     497       177124 :             .expect("should always have at least one item");
     498       177124 :         let buf = last.pack();
     499       177124 :         let downlink_key = last.first_key();
     500       177124 :         let downlink_ptr = self.writer.write_blk(buf)?;
     501              : 
     502              :         // Append the downlink to the parent. If there is no parent, ie. this was the root page,
     503              :         // create a new root page, increasing the height of the tree.
     504       177124 :         if self.stack.is_empty() {
     505        10667 :             self.stack.push(BuildNode::new(last.level + 1));
     506       166457 :         }
     507       177124 :         self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
     508       177124 :     }
     509              : 
     510              :     ///
     511              :     /// Flushes everything to disk, and returns the block number of the root page.
     512              :     /// The caller must store the root block number "out-of-band", and pass it
     513              :     /// to the DiskBtreeReader::new() when you want to read the tree again.
     514              :     /// (In the image and delta layers, it is stored in the beginning of the file,
     515              :     /// in the summary header)
     516              :     ///
     517        16624 :     pub fn finish(mut self) -> Result<(u32, W)> {
     518              :         // flush all levels, except the root.
     519        27284 :         while self.stack.len() > 1 {
     520        10660 :             self.flush_node()?;
     521              :         }
     522              : 
     523        16624 :         let root = self
     524        16624 :             .stack
     525        16624 :             .first()
     526        16624 :             .expect("by the check above we left one item there");
     527        16624 :         let buf = root.pack();
     528        16624 :         let root_blknum = self.writer.write_blk(buf)?;
     529              : 
     530        16624 :         Ok((root_blknum, self.writer))
     531        16624 :     }
     532              : 
     533      1751920 :     pub fn borrow_writer(&self) -> &W {
     534      1751920 :         &self.writer
     535      1751920 :     }
     536              : }
     537              : 
     538              : ///
     539              : /// BuildNode represesnts an incomplete page that we are appending to.
     540              : ///
     541            0 : #[derive(Clone, Debug)]
     542              : struct BuildNode<const L: usize> {
     543              :     num_children: u16,
     544              :     level: u8,
     545              :     prefix: Vec<u8>,
     546              :     suffix_len: usize,
     547              : 
     548              :     keys: Vec<u8>,
     549              :     values: Vec<u8>,
     550              : 
     551              :     size: usize, // physical size of this node, if it was written to disk like this
     552              : }
     553              : 
     554              : const NODE_SIZE: usize = PAGE_SZ;
     555              : 
     556              : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
     557              : 
     558              : impl<const L: usize> BuildNode<L> {
     559       193763 :     fn new(level: u8) -> Self {
     560       193763 :         BuildNode {
     561       193763 :             num_children: 0,
     562       193763 :             level,
     563       193763 :             prefix: Vec::new(),
     564       193763 :             suffix_len: 0,
     565       193763 :             keys: Vec::new(),
     566       193763 :             values: Vec::new(),
     567       193763 :             size: NODE_HDR_SIZE,
     568       193763 :         }
     569       193763 :     }
     570              : 
     571              :     /// Try to append a key-value pair to this node. Returns 'true' on
     572              :     /// success, 'false' if the page was full or the key was
     573              :     /// incompatible with the prefix of the existing keys.
     574     92334666 :     fn push(&mut self, key: &[u8; L], value: Value) -> bool {
     575     92334666 :         // If we have already performed prefix-compression on the page,
     576     92334666 :         // check that the incoming key has the same prefix.
     577     92334666 :         if self.num_children > 0 {
     578              :             // does the prefix allow it?
     579     92140903 :             if !key.starts_with(&self.prefix) {
     580        48200 :                 return false;
     581     92092703 :             }
     582       193763 :         } else {
     583       193763 :             self.suffix_len = key.len();
     584       193763 :         }
     585              : 
     586              :         // Is the node too full?
     587     92286466 :         if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
     588       287555 :             return false;
     589     91998911 :         }
     590     91998911 : 
     591     91998911 :         // All clear
     592     91998911 :         self.num_children += 1;
     593     91998911 :         self.keys.extend(&key[self.prefix.len()..]);
     594     91998911 :         self.values.extend(value.0);
     595     91998911 : 
     596     91998911 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     597     91998911 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     598              : 
     599     91998911 :         self.size += self.suffix_len + VALUE_SZ;
     600     91998911 : 
     601     91998911 :         true
     602     92334666 :     }
     603              : 
     604              :     ///
     605              :     /// Perform prefix-compression.
     606              :     ///
     607              :     /// Returns 'true' on success, 'false' if no compression was possible.
     608              :     ///
     609       335690 :     fn compress(&mut self) -> bool {
     610       335690 :         let first_suffix = self.first_suffix();
     611       335690 :         let last_suffix = self.last_suffix();
     612       335690 : 
     613       335690 :         // Find the common prefix among all keys
     614       335690 :         let mut prefix_len = 0;
     615      3381974 :         while prefix_len < self.suffix_len {
     616      3381974 :             if first_suffix[prefix_len] != last_suffix[prefix_len] {
     617       335690 :                 break;
     618      3046284 :             }
     619      3046284 :             prefix_len += 1;
     620              :         }
     621       335690 :         if prefix_len == 0 {
     622       166399 :             return false;
     623       169291 :         }
     624       169291 : 
     625       169291 :         // Can compress. Rewrite the keys without the common prefix.
     626       169291 :         self.prefix.extend(&self.keys[..prefix_len]);
     627       169291 : 
     628       169291 :         let mut new_keys = Vec::new();
     629       169291 :         let mut key_off = 0;
     630     44891963 :         while key_off < self.keys.len() {
     631     44722672 :             let next_key_off = key_off + self.suffix_len;
     632     44722672 :             new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
     633     44722672 :             key_off = next_key_off;
     634     44722672 :         }
     635       169291 :         self.keys = new_keys;
     636       169291 :         self.suffix_len -= prefix_len;
     637       169291 : 
     638       169291 :         self.size -= prefix_len * self.num_children as usize;
     639       169291 :         self.size += prefix_len;
     640       169291 : 
     641       169291 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     642       169291 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     643              : 
     644       169291 :         true
     645       335690 :     }
     646              : 
     647              :     ///
     648              :     /// Serialize the node to on-disk format.
     649              :     ///
     650       193748 :     fn pack(&self) -> Bytes {
     651       193748 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     652       193748 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     653       193748 :         assert!(self.num_children > 0);
     654              : 
     655       193748 :         let mut buf = BytesMut::new();
     656       193748 : 
     657       193748 :         buf.put_u16(self.num_children);
     658       193748 :         buf.put_u8(self.level);
     659       193748 :         buf.put_u8(self.prefix.len() as u8);
     660       193748 :         buf.put_u8(self.suffix_len as u8);
     661       193748 :         buf.put(&self.prefix[..]);
     662       193748 :         buf.put(&self.keys[..]);
     663       193748 :         buf.put(&self.values[..]);
     664       193748 : 
     665       193748 :         assert!(buf.len() == self.size);
     666              : 
     667       193748 :         assert!(buf.len() <= PAGE_SZ);
     668       193748 :         buf.resize(PAGE_SZ, 0);
     669       193748 :         buf.freeze()
     670       193748 :     }
     671              : 
     672       512814 :     fn first_suffix(&self) -> &[u8] {
     673       512814 :         &self.keys[..self.suffix_len]
     674       512814 :     }
     675       335690 :     fn last_suffix(&self) -> &[u8] {
     676       335690 :         &self.keys[self.keys.len() - self.suffix_len..]
     677       335690 :     }
     678              : 
     679              :     /// Return the full first key of the page, including the prefix
     680       177124 :     fn first_key(&self) -> [u8; L] {
     681       177124 :         let mut key = [0u8; L];
     682       177124 :         key[..self.prefix.len()].copy_from_slice(&self.prefix);
     683       177124 :         key[self.prefix.len()..].copy_from_slice(self.first_suffix());
     684       177124 :         key
     685       177124 :     }
     686              : }
     687              : 
     688              : #[cfg(test)]
     689              : pub(crate) mod tests {
     690              :     use super::*;
     691              :     use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
     692              :     use rand::Rng;
     693              :     use std::collections::BTreeMap;
     694              :     use std::sync::atomic::{AtomicUsize, Ordering};
     695              : 
     696            5 :     #[derive(Clone, Default)]
     697              :     pub(crate) struct TestDisk {
     698              :         blocks: Vec<Bytes>,
     699              :     }
     700              :     impl TestDisk {
     701            5 :         fn new() -> Self {
     702            5 :             Self::default()
     703            5 :         }
     704       508211 :         pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
     705       508211 :             let mut buf = [0u8; PAGE_SZ];
     706       508211 :             buf.copy_from_slice(&self.blocks[blknum as usize]);
     707       508211 :             Ok(std::sync::Arc::new(buf).into())
     708       508211 :         }
     709              :     }
     710              :     impl BlockReader for TestDisk {
     711       205622 :         fn block_cursor(&self) -> BlockCursor<'_> {
     712       205622 :             BlockCursor::new(BlockReaderRef::TestDisk(self))
     713       205622 :         }
     714              :     }
     715              :     impl BlockWriter for &mut TestDisk {
     716          107 :         fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
     717          107 :             let blknum = self.blocks.len();
     718          107 :             self.blocks.push(buf);
     719          107 :             Ok(blknum as u32)
     720          107 :         }
     721              :     }
     722              : 
     723            1 :     #[tokio::test]
     724            1 :     async fn basic() -> Result<()> {
     725            1 :         let mut disk = TestDisk::new();
     726            1 :         let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
     727            1 : 
     728            1 :         let all_keys: Vec<&[u8; 6]> = vec![
     729            1 :             b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
     730            1 :         ];
     731            1 :         let all_data: Vec<(&[u8; 6], u64)> = all_keys
     732            1 :             .iter()
     733            1 :             .enumerate()
     734            8 :             .map(|(idx, key)| (*key, idx as u64))
     735            1 :             .collect();
     736            8 :         for (key, val) in all_data.iter() {
     737            8 :             writer.append(key, *val)?;
     738              :         }
     739              : 
     740            1 :         let (root_offset, _writer) = writer.finish()?;
     741              : 
     742            1 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     743            1 : 
     744            1 :         reader.dump().await?;
     745              : 
     746              :         // Test the `get` function on all the keys.
     747            8 :         for (key, val) in all_data.iter() {
     748            8 :             assert_eq!(reader.get(key).await?, Some(*val));
     749              :         }
     750              :         // And on some keys that don't exist
     751            1 :         assert_eq!(reader.get(b"aaaaaa").await?, None);
     752            1 :         assert_eq!(reader.get(b"zzzzzz").await?, None);
     753            1 :         assert_eq!(reader.get(b"xaaabx").await?, None);
     754              : 
     755              :         // Test search with `visit` function
     756            1 :         let search_key = b"xabaaa";
     757            1 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     758            1 :             .iter()
     759            8 :             .filter(|(key, _value)| key[..] >= search_key[..])
     760            5 :             .map(|(key, value)| (key.to_vec(), *value))
     761            1 :             .collect();
     762            1 : 
     763            1 :         let mut data = Vec::new();
     764            1 :         reader
     765            5 :             .visit(search_key, VisitDirection::Forwards, |key, value| {
     766            5 :                 data.push((key.to_vec(), value));
     767            5 :                 true
     768            5 :             })
     769            0 :             .await?;
     770            1 :         assert_eq!(data, expected);
     771              : 
     772              :         // Test a backwards scan
     773            1 :         let mut expected: Vec<(Vec<u8>, u64)> = all_data
     774            1 :             .iter()
     775            8 :             .filter(|(key, _value)| key[..] <= search_key[..])
     776            4 :             .map(|(key, value)| (key.to_vec(), *value))
     777            1 :             .collect();
     778            1 :         expected.reverse();
     779            1 :         let mut data = Vec::new();
     780            1 :         reader
     781            4 :             .visit(search_key, VisitDirection::Backwards, |key, value| {
     782            4 :                 data.push((key.to_vec(), value));
     783            4 :                 true
     784            4 :             })
     785            0 :             .await?;
     786            1 :         assert_eq!(data, expected);
     787              : 
     788              :         // Backward scan where nothing matches
     789            1 :         reader
     790            1 :             .visit(b"aaaaaa", VisitDirection::Backwards, |key, value| {
     791            0 :                 panic!("found unexpected key {}: {}", hex::encode(key), value);
     792            1 :             })
     793            0 :             .await?;
     794              : 
     795              :         // Full scan
     796            1 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     797            1 :             .iter()
     798            8 :             .map(|(key, value)| (key.to_vec(), *value))
     799            1 :             .collect();
     800            1 :         let mut data = Vec::new();
     801            1 :         reader
     802            8 :             .visit(&[0u8; 6], VisitDirection::Forwards, |key, value| {
     803            8 :                 data.push((key.to_vec(), value));
     804            8 :                 true
     805            8 :             })
     806            0 :             .await?;
     807            1 :         assert_eq!(data, expected);
     808              : 
     809            1 :         Ok(())
     810              :     }
     811              : 
     812            1 :     #[tokio::test]
     813            1 :     async fn lots_of_keys() -> Result<()> {
     814            1 :         let mut disk = TestDisk::new();
     815            1 :         let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
     816            1 : 
     817            1 :         const NUM_KEYS: u64 = 1000;
     818            1 : 
     819            1 :         let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
     820              : 
     821         1001 :         for idx in 0..NUM_KEYS {
     822         1000 :             let key_int: u64 = 1 + idx * 2;
     823         1000 :             let key = u64::to_be_bytes(key_int);
     824         1000 :             writer.append(&key, idx)?;
     825              : 
     826         1000 :             all_data.insert(key_int, idx);
     827              :         }
     828              : 
     829            1 :         let (root_offset, _writer) = writer.finish()?;
     830              : 
     831            1 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     832            1 : 
     833            1 :         reader.dump().await?;
     834              : 
     835              :         use std::sync::Mutex;
     836              : 
     837            1 :         let result = Mutex::new(Vec::new());
     838            1 :         let limit: AtomicUsize = AtomicUsize::new(10);
     839        41910 :         let take_ten = |key: &[u8], value: u64| {
     840        41910 :             let mut keybuf = [0u8; 8];
     841        41910 :             keybuf.copy_from_slice(key);
     842        41910 :             let key_int = u64::from_be_bytes(keybuf);
     843        41910 : 
     844        41910 :             let mut result = result.lock().unwrap();
     845        41910 :             result.push((key_int, value));
     846        41910 : 
     847        41910 :             // keep going until we have 10 matches
     848        41910 :             result.len() < limit.load(Ordering::Relaxed)
     849        41910 :         };
     850              : 
     851         2010 :         for search_key_int in 0..(NUM_KEYS * 2 + 10) {
     852         2010 :             let search_key = u64::to_be_bytes(search_key_int);
     853         2010 :             assert_eq!(
     854         2010 :                 reader.get(&search_key).await?,
     855         2010 :                 all_data.get(&search_key_int).cloned()
     856              :             );
     857              : 
     858              :             // Test a forward scan starting with this key
     859         2010 :             result.lock().unwrap().clear();
     860         2010 :             reader
     861         2010 :                 .visit(&search_key, VisitDirection::Forwards, take_ten)
     862            0 :                 .await?;
     863         2010 :             let expected = all_data
     864         2010 :                 .range(search_key_int..)
     865         2010 :                 .take(10)
     866        19910 :                 .map(|(&key, &val)| (key, val))
     867         2010 :                 .collect::<Vec<(u64, u64)>>();
     868         2010 :             assert_eq!(*result.lock().unwrap(), expected);
     869              : 
     870              :             // And a backwards scan
     871         2010 :             result.lock().unwrap().clear();
     872         2010 :             reader
     873         2010 :                 .visit(&search_key, VisitDirection::Backwards, take_ten)
     874            0 :                 .await?;
     875         2010 :             let expected = all_data
     876         2010 :                 .range(..=search_key_int)
     877         2010 :                 .rev()
     878         2010 :                 .take(10)
     879        20000 :                 .map(|(&key, &val)| (key, val))
     880         2010 :                 .collect::<Vec<(u64, u64)>>();
     881         2010 :             assert_eq!(*result.lock().unwrap(), expected);
     882              :         }
     883              : 
     884              :         // full scan
     885            1 :         let search_key = u64::to_be_bytes(0);
     886            1 :         limit.store(usize::MAX, Ordering::Relaxed);
     887            1 :         result.lock().unwrap().clear();
     888            1 :         reader
     889            1 :             .visit(&search_key, VisitDirection::Forwards, take_ten)
     890            0 :             .await?;
     891            1 :         let expected = all_data
     892            1 :             .iter()
     893         1000 :             .map(|(&key, &val)| (key, val))
     894            1 :             .collect::<Vec<(u64, u64)>>();
     895            1 :         assert_eq!(*result.lock().unwrap(), expected);
     896              : 
     897              :         // full scan
     898            1 :         let search_key = u64::to_be_bytes(u64::MAX);
     899            1 :         limit.store(usize::MAX, Ordering::Relaxed);
     900            1 :         result.lock().unwrap().clear();
     901            1 :         reader
     902            1 :             .visit(&search_key, VisitDirection::Backwards, take_ten)
     903            0 :             .await?;
     904            1 :         let expected = all_data
     905            1 :             .iter()
     906            1 :             .rev()
     907         1000 :             .map(|(&key, &val)| (key, val))
     908            1 :             .collect::<Vec<(u64, u64)>>();
     909            1 :         assert_eq!(*result.lock().unwrap(), expected);
     910              : 
     911            1 :         Ok(())
     912              :     }
     913              : 
     914            1 :     #[tokio::test]
     915            1 :     async fn random_data() -> Result<()> {
     916            1 :         // Generate random keys with exponential distribution, to
     917            1 :         // exercise the prefix compression
     918            1 :         const NUM_KEYS: usize = 100000;
     919            1 :         let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
     920       100001 :         for idx in 0..NUM_KEYS {
     921       100000 :             let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
     922       100000 :             let t = -(f64::ln(u));
     923       100000 :             let key_int = (t * 1000000.0) as u128;
     924       100000 : 
     925       100000 :             all_data.insert(key_int, idx as u64);
     926       100000 :         }
     927              : 
     928              :         // Build a tree from it
     929            1 :         let mut disk = TestDisk::new();
     930            1 :         let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
     931              : 
     932        97569 :         for (&key, &val) in all_data.iter() {
     933        97569 :             writer.append(&u128::to_be_bytes(key), val)?;
     934              :         }
     935            1 :         let (root_offset, _writer) = writer.finish()?;
     936              : 
     937            1 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     938              : 
     939              :         // Test get() operation on all the keys
     940        97569 :         for (&key, &val) in all_data.iter() {
     941        97569 :             let search_key = u128::to_be_bytes(key);
     942        97569 :             assert_eq!(reader.get(&search_key).await?, Some(val));
     943              :         }
     944              : 
     945              :         // Test get() operations on random keys, most of which will not exist
     946       100001 :         for _ in 0..100000 {
     947       100000 :             let key_int = rand::thread_rng().gen::<u128>();
     948       100000 :             let search_key = u128::to_be_bytes(key_int);
     949       100000 :             assert!(reader.get(&search_key).await? == all_data.get(&key_int).cloned());
     950              :         }
     951              : 
     952              :         // Test boundary cases
     953            1 :         assert!(
     954            1 :             reader.get(&u128::to_be_bytes(u128::MIN)).await? == all_data.get(&u128::MIN).cloned()
     955              :         );
     956            1 :         assert!(
     957            1 :             reader.get(&u128::to_be_bytes(u128::MAX)).await? == all_data.get(&u128::MAX).cloned()
     958              :         );
     959              : 
     960            1 :         Ok(())
     961              :     }
     962              : 
     963            1 :     #[test]
     964            1 :     fn unsorted_input() {
     965            1 :         let mut disk = TestDisk::new();
     966            1 :         let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
     967            1 : 
     968            1 :         let _ = writer.append(b"ba", 1);
     969            1 :         let _ = writer.append(b"bb", 2);
     970            1 :         let err = writer.append(b"aa", 3).expect_err("should've failed");
     971            1 :         match err {
     972            1 :             DiskBtreeError::UnsortedInput { key, last_key } => {
     973            1 :                 assert_eq!(key.as_ref(), b"aa".as_slice());
     974            1 :                 assert_eq!(last_key.as_ref(), b"bb".as_slice());
     975              :             }
     976            0 :             _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
     977              :         }
     978            1 :     }
     979              : 
     980              :     ///
     981              :     /// This test contains a particular data set, see disk_btree_test_data.rs
     982              :     ///
     983            1 :     #[tokio::test]
     984            1 :     async fn particular_data() -> Result<()> {
     985            1 :         // Build a tree from it
     986            1 :         let mut disk = TestDisk::new();
     987            1 :         let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
     988              : 
     989         2001 :         for (key, val) in disk_btree_test_data::TEST_DATA {
     990         2000 :             writer.append(&key, val)?;
     991              :         }
     992            1 :         let (root_offset, writer) = writer.finish()?;
     993              : 
     994            1 :         println!("SIZE: {} blocks", writer.blocks.len());
     995            1 : 
     996            1 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     997              : 
     998              :         // Test get() operation on all the keys
     999         2001 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1000         2000 :             assert_eq!(reader.get(&key).await?, Some(val));
    1001              :         }
    1002              : 
    1003              :         // Test full scan
    1004            1 :         let mut count = 0;
    1005            1 :         reader
    1006         2000 :             .visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| {
    1007         2000 :                 count += 1;
    1008         2000 :                 true
    1009         2000 :             })
    1010            0 :             .await?;
    1011            1 :         assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
    1012              : 
    1013            1 :         reader.dump().await?;
    1014              : 
    1015            1 :         Ok(())
    1016              :     }
    1017              : }
    1018              : 
    1019              : #[cfg(test)]
    1020              : #[path = "disk_btree_test_data.rs"]
    1021              : mod disk_btree_test_data;
        

Generated by: LCOV version 2.1-beta