LCOV - code coverage report
Current view: top level - pageserver/src/tenant - disk_btree.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 98.7 % 746 736
Test Date: 2024-02-29 11:57:12 Functions: 77.5 % 218 169

            Line data    Source code
       1              : //!
       2              : //! Simple on-disk B-tree implementation
       3              : //!
       4              : //! This is used as the index structure within image and delta layers
       5              : //!
       6              : //! Features:
       7              : //! - Fixed-width keys
       8              : //! - Fixed-width values (VALUE_SZ)
       9              : //! - The tree is created in a bulk operation. Insert/deletion after creation
      10              : //!   is not supported
      11              : //! - page-oriented
      12              : //!
      13              : //! TODO:
      14              : //! - maybe something like an Adaptive Radix Tree would be more efficient?
      15              : //! - the values stored by image and delta layers are offsets into the file,
      16              : //!   and they are in monotonically increasing order. Prefix compression would
      17              : //!   be very useful for them, too.
      18              : //! - An Iterator interface would be more convenient for the callers than the
      19              : //!   'visit' function
      20              : //!
      21              : use byteorder::{ReadBytesExt, BE};
      22              : use bytes::{BufMut, Bytes, BytesMut};
      23              : use either::Either;
      24              : use hex;
      25              : use std::{cmp::Ordering, io, result};
      26              : use thiserror::Error;
      27              : use tracing::error;
      28              : 
      29              : use crate::{
      30              :     context::{DownloadBehavior, RequestContext},
      31              :     task_mgr::TaskKind,
      32              :     tenant::block_io::{BlockReader, BlockWriter},
      33              : };
      34              : 
      35              : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
      36              : pub const VALUE_SZ: usize = 5;
      37              : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
      38              : 
      39              : pub const PAGE_SZ: usize = 8192;
      40              : 
      41            0 : #[derive(Clone, Copy, Debug)]
      42              : struct Value([u8; VALUE_SZ]);
      43              : 
      44              : impl Value {
      45      3039839 :     fn from_slice(slice: &[u8]) -> Value {
      46      3039839 :         let mut b = [0u8; VALUE_SZ];
      47      3039839 :         b.copy_from_slice(slice);
      48      3039839 :         Value(b)
      49      3039839 :     }
      50              : 
      51      4513257 :     fn from_u64(x: u64) -> Value {
      52      4513257 :         assert!(x <= 0x007f_ffff_ffff);
      53      4513257 :         Value([
      54      4513257 :             (x >> 32) as u8,
      55      4513257 :             (x >> 24) as u8,
      56      4513257 :             (x >> 16) as u8,
      57      4513257 :             (x >> 8) as u8,
      58      4513257 :             x as u8,
      59      4513257 :         ])
      60      4513257 :     }
      61              : 
      62         8365 :     fn from_blknum(x: u32) -> Value {
      63         8365 :         Value([
      64         8365 :             0x80,
      65         8365 :             (x >> 24) as u8,
      66         8365 :             (x >> 16) as u8,
      67         8365 :             (x >> 8) as u8,
      68         8365 :             x as u8,
      69         8365 :         ])
      70         8365 :     }
      71              : 
      72              :     #[allow(dead_code)]
      73            0 :     fn is_offset(self) -> bool {
      74            0 :         self.0[0] & 0x80 != 0
      75            0 :     }
      76              : 
      77      2512246 :     fn to_u64(self) -> u64 {
      78      2512246 :         let b = &self.0;
      79      2512246 :         (b[0] as u64) << 32
      80      2512246 :             | (b[1] as u64) << 24
      81      2512246 :             | (b[2] as u64) << 16
      82      2512246 :             | (b[3] as u64) << 8
      83      2512246 :             | b[4] as u64
      84      2512246 :     }
      85              : 
      86       521569 :     fn to_blknum(self) -> u32 {
      87       521569 :         let b = &self.0;
      88       521569 :         assert!(b[0] == 0x80);
      89       521569 :         (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
      90       521569 :     }
      91              : }
      92              : 
      93            0 : #[derive(Error, Debug)]
      94              : pub enum DiskBtreeError {
      95              :     #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
      96              :     AppendOverflow(u64),
      97              : 
      98              :     #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
      99              :     UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
     100              : 
     101              :     #[error("Could not push to new leaf node")]
     102              :     FailedToPushToNewLeafNode,
     103              : 
     104              :     #[error("IoError: {0}")]
     105              :     Io(#[from] io::Error),
     106              : }
     107              : 
     108              : pub type Result<T> = result::Result<T, DiskBtreeError>;
     109              : 
     110              : /// This is the on-disk representation.
     111              : struct OnDiskNode<'a, const L: usize> {
     112              :     // Fixed-width fields
     113              :     num_children: u16,
     114              :     level: u8,
     115              :     prefix_len: u8,
     116              :     suffix_len: u8,
     117              : 
     118              :     // Variable-length fields. These are stored on-disk after the fixed-width
     119              :     // fields, in this order. In the in-memory representation, these point to
     120              :     // the right parts in the page buffer.
     121              :     prefix: &'a [u8],
     122              :     keys: &'a [u8],
     123              :     values: &'a [u8],
     124              : }
     125              : 
     126              : impl<'a, const L: usize> OnDiskNode<'a, L> {
     127              :     ///
     128              :     /// Interpret a PAGE_SZ page as a node.
     129              :     ///
     130      1267259 :     fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
     131      1267259 :         let mut cursor = std::io::Cursor::new(buf);
     132      1267259 :         let num_children = cursor.read_u16::<BE>()?;
     133      1267259 :         let level = cursor.read_u8()?;
     134      1267259 :         let prefix_len = cursor.read_u8()?;
     135      1267259 :         let suffix_len = cursor.read_u8()?;
     136              : 
     137      1267259 :         let mut off = cursor.position();
     138      1267259 :         let prefix_off = off as usize;
     139      1267259 :         off += prefix_len as u64;
     140      1267259 : 
     141      1267259 :         let keys_off = off as usize;
     142      1267259 :         let keys_len = num_children as usize * suffix_len as usize;
     143      1267259 :         off += keys_len as u64;
     144      1267259 : 
     145      1267259 :         let values_off = off as usize;
     146      1267259 :         let values_len = num_children as usize * VALUE_SZ;
     147      1267259 :         //off += values_len as u64;
     148      1267259 : 
     149      1267259 :         let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
     150      1267259 :         let keys = &buf[keys_off..keys_off + keys_len];
     151      1267259 :         let values = &buf[values_off..values_off + values_len];
     152      1267259 : 
     153      1267259 :         Ok(OnDiskNode {
     154      1267259 :             num_children,
     155      1267259 :             level,
     156      1267259 :             prefix_len,
     157      1267259 :             suffix_len,
     158      1267259 :             prefix,
     159      1267259 :             keys,
     160      1267259 :             values,
     161      1267259 :         })
     162      1267259 :     }
     163              : 
     164              :     ///
     165              :     /// Read a value at 'idx'
     166              :     ///
     167      3039839 :     fn value(&self, idx: usize) -> Value {
     168      3039839 :         let value_off = idx * VALUE_SZ;
     169      3039839 :         let value_slice = &self.values[value_off..value_off + VALUE_SZ];
     170      3039839 :         Value::from_slice(value_slice)
     171      3039839 :     }
     172              : 
     173      1057182 :     fn binary_search(
     174      1057182 :         &self,
     175      1057182 :         search_key: &[u8; L],
     176      1057182 :         keybuf: &mut [u8],
     177      1057182 :     ) -> result::Result<usize, usize> {
     178      1057182 :         let mut size = self.num_children as usize;
     179      1057182 :         let mut low = 0;
     180      1057182 :         let mut high = size;
     181      8193238 :         while low < high {
     182      7342093 :             let mid = low + size / 2;
     183      7342093 : 
     184      7342093 :             let key_off = mid * self.suffix_len as usize;
     185      7342093 :             let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
     186      7342093 :             // Does this match?
     187      7342093 :             keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
     188      7342093 : 
     189      7342093 :             let cmp = keybuf[..].cmp(search_key);
     190      7342093 : 
     191      7342093 :             if cmp == Ordering::Less {
     192      4896110 :                 low = mid + 1;
     193      4896110 :             } else if cmp == Ordering::Greater {
     194      2239946 :                 high = mid;
     195      2239946 :             } else {
     196       206037 :                 return Ok(mid);
     197              :             }
     198      7136056 :             size = high - low;
     199              :         }
     200       851145 :         Err(low)
     201      1057182 :     }
     202              : }
     203              : 
     204              : ///
     205              : /// Public reader object, to search the tree.
     206              : ///
     207              : pub struct DiskBtreeReader<R, const L: usize>
     208              : where
     209              :     R: BlockReader,
     210              : {
     211              :     start_blk: u32,
     212              :     root_blk: u32,
     213              :     reader: R,
     214              : }
     215              : 
     216      1057182 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     217              : pub enum VisitDirection {
     218              :     Forwards,
     219              :     Backwards,
     220              : }
     221              : 
     222              : impl<R, const L: usize> DiskBtreeReader<R, L>
     223              : where
     224              :     R: BlockReader,
     225              : {
     226       124354 :     pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
     227       124354 :         DiskBtreeReader {
     228       124354 :             start_blk,
     229       124354 :             root_blk,
     230       124354 :             reader,
     231       124354 :         }
     232       124354 :     }
     233              : 
     234              :     ///
     235              :     /// Read the value for given key. Returns the value, or None if it doesn't exist.
     236              :     ///
     237       403737 :     pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
     238       403737 :         let mut result: Option<u64> = None;
     239       403737 :         self.visit(
     240       403737 :             search_key,
     241       403737 :             VisitDirection::Forwards,
     242       403737 :             |key, value| {
     243       203713 :                 if key == search_key {
     244       201703 :                     result = Some(value);
     245       201703 :                 }
     246       203713 :                 false
     247       403737 :             },
     248       403737 :             ctx,
     249       403737 :         )
     250          287 :         .await?;
     251       403737 :         Ok(result)
     252       403737 :     }
     253              : 
     254              :     ///
     255              :     /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
     256              :     /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
     257              :     /// backwards)
     258              :     ///
     259       535621 :     pub async fn visit<V>(
     260       535621 :         &self,
     261       535621 :         search_key: &[u8; L],
     262       535621 :         dir: VisitDirection,
     263       535621 :         mut visitor: V,
     264       535621 :         ctx: &RequestContext,
     265       535621 :     ) -> Result<bool>
     266       535621 :     where
     267       535621 :         V: FnMut(&[u8], u64) -> bool,
     268       535621 :     {
     269       535621 :         let mut stack = Vec::new();
     270       535621 :         stack.push((self.root_blk, None));
     271       535621 :         let block_cursor = self.reader.block_cursor();
     272      1461659 :         while let Some((node_blknum, opt_iter)) = stack.pop() {
     273              :             // Locate the node.
     274      1261227 :             let node_buf = block_cursor
     275      1261227 :                 .read_blk(self.start_blk + node_blknum, ctx)
     276        19665 :                 .await?;
     277              : 
     278      1261227 :             let node = OnDiskNode::deparse(node_buf.as_ref())?;
     279      1261227 :             let prefix_len = node.prefix_len as usize;
     280      1261227 :             let suffix_len = node.suffix_len as usize;
     281      1261227 : 
     282      1261227 :             assert!(node.num_children > 0);
     283              : 
     284      1261227 :             let mut keybuf = Vec::new();
     285      1261227 :             keybuf.extend(node.prefix);
     286      1261227 :             keybuf.resize(prefix_len + suffix_len, 0);
     287              : 
     288      1261227 :             let mut iter = if let Some(iter) = opt_iter {
     289       204045 :                 iter
     290      1057182 :             } else if dir == VisitDirection::Forwards {
     291              :                 // Locate the first match
     292       811315 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     293       203911 :                     Ok(idx) => idx,
     294       607404 :                     Err(idx) => {
     295       607404 :                         if node.level == 0 {
     296              :                             // Imagine that the node contains the following keys:
     297              :                             //
     298              :                             // 1
     299              :                             // 3  <-- idx
     300              :                             // 5
     301              :                             //
     302              :                             // If the search key is '2' and there is exact match,
     303              :                             // the binary search would return the index of key
     304              :                             // '3'. That's cool, '3' is the first key to return.
     305       208115 :                             idx
     306              :                         } else {
     307              :                             // This is an internal page, so each key represents a lower
     308              :                             // bound for what's in the child page. If there is no exact
     309              :                             // match, we have to return the *previous* entry.
     310              :                             //
     311              :                             // 1  <-- return this
     312              :                             // 3  <-- idx
     313              :                             // 5
     314       399289 :                             idx.saturating_sub(1)
     315              :                         }
     316              :                     }
     317              :                 };
     318       811315 :                 Either::Left(idx..node.num_children.into())
     319              :             } else {
     320       245867 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     321         2126 :                     Ok(idx) => {
     322         2126 :                         // Exact match. That's the first entry to return, and walk
     323         2126 :                         // backwards from there.
     324         2126 :                         idx
     325              :                     }
     326       243741 :                     Err(idx) => {
     327              :                         // No exact match. The binary search returned the index of the
     328              :                         // first key that's > search_key. Back off by one, and walk
     329              :                         // backwards from there.
     330       243741 :                         if let Some(idx) = idx.checked_sub(1) {
     331       238562 :                             idx
     332              :                         } else {
     333         5179 :                             return Ok(false);
     334              :                         }
     335              :                     }
     336              :                 };
     337       240688 :                 Either::Right((0..=idx).rev())
     338              :             };
     339              : 
     340              :             // idx points to the first match now. Keep going from there
     341      3438284 :             while let Some(idx) = iter.next() {
     342      3033807 :                 let key_off = idx * suffix_len;
     343      3033807 :                 let suffix = &node.keys[key_off..key_off + suffix_len];
     344      3033807 :                 keybuf[prefix_len..].copy_from_slice(suffix);
     345      3033807 :                 let value = node.value(idx);
     346      3033807 :                 #[allow(clippy::collapsible_if)]
     347      3033807 :                 if node.level == 0 {
     348              :                     // leaf
     349      2512246 :                     if !visitor(&keybuf, value.to_u64()) {
     350       330010 :                         return Ok(false);
     351      2182236 :                     }
     352              :                 } else {
     353       521561 :                     stack.push((node_blknum, Some(iter)));
     354       521561 :                     stack.push((value.to_blknum(), None));
     355       521561 :                     break;
     356              :                 }
     357              :             }
     358              :         }
     359       200432 :         Ok(true)
     360       535621 :     }
     361              : 
     362              :     #[allow(dead_code)]
     363           10 :     pub async fn dump(&self) -> Result<()> {
     364           10 :         let mut stack = Vec::new();
     365           10 :         let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
     366           10 : 
     367           10 :         stack.push((self.root_blk, String::new(), 0, 0, 0));
     368           10 : 
     369           10 :         let block_cursor = self.reader.block_cursor();
     370              : 
     371         6042 :         while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
     372         6032 :             let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
     373         6032 :             let buf: &[u8] = blk.as_ref();
     374         6032 :             let node = OnDiskNode::<L>::deparse(buf)?;
     375              : 
     376         6032 :             if child_idx == 0 {
     377           18 :                 print!("{:indent$}", "", indent = depth * 2);
     378           18 :                 let path_prefix = stack
     379           18 :                     .iter()
     380           18 :                     .map(|(_blknum, path, ..)| path.as_str())
     381           18 :                     .collect::<String>();
     382           18 :                 println!(
     383           18 :                     "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
     384           18 :                     hex::encode(node.prefix),
     385           18 :                     node.suffix_len
     386           18 :                 );
     387         6014 :             }
     388              : 
     389         6032 :             if child_idx + 1 < node.num_children {
     390         6014 :                 let key_off = key_off + node.suffix_len as usize;
     391         6014 :                 stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
     392         6014 :             }
     393         6032 :             let key = &node.keys[key_off..key_off + node.suffix_len as usize];
     394         6032 :             let val = node.value(child_idx as usize);
     395         6032 : 
     396         6032 :             print!("{:indent$}", "", indent = depth * 2 + 2);
     397         6032 :             println!("{}: {}", hex::encode(key), hex::encode(val.0));
     398         6032 : 
     399         6032 :             if node.level > 0 {
     400            8 :                 stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
     401         6024 :             }
     402              :         }
     403           10 :         Ok(())
     404           10 :     }
     405              : }
     406              : 
     407              : ///
     408              : /// Public builder object, for creating a new tree.
     409              : ///
     410              : /// Usage: Create a builder object by calling 'new', load all the data into the
     411              : /// tree by calling 'append' for each key-value pair, and then call 'finish'
     412              : ///
     413              : /// 'L' is the key length in bytes
     414              : pub struct DiskBtreeBuilder<W, const L: usize>
     415              : where
     416              :     W: BlockWriter,
     417              : {
     418              :     writer: W,
     419              : 
     420              :     ///
     421              :     /// `stack[0]` is the current root page, `stack.last()` is the leaf.
     422              :     ///
     423              :     /// We maintain the length of the stack to be always greater than zero.
     424              :     /// Two exceptions are:
     425              :     /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
     426              :     ///   So because other methods cannot see the intermediate state invariant still holds.
     427              :     /// 2. `Self::finish`. It consumes self and does not return it back,
     428              :     ///  which means that this is where the structure is destroyed.
     429              :     ///  Thus stack of zero length cannot be observed by other methods.
     430              :     stack: Vec<BuildNode<L>>,
     431              : 
     432              :     /// Last key that was appended to the tree. Used to sanity check that append
     433              :     /// is called in increasing key order.
     434              :     last_key: Option<[u8; L]>,
     435              : }
     436              : 
     437              : impl<W, const L: usize> DiskBtreeBuilder<W, L>
     438              : where
     439              :     W: BlockWriter,
     440              : {
     441          564 :     pub fn new(writer: W) -> Self {
     442          564 :         DiskBtreeBuilder {
     443          564 :             writer,
     444          564 :             last_key: None,
     445          564 :             stack: vec![BuildNode::new(0)],
     446          564 :         }
     447          564 :     }
     448              : 
     449      4513259 :     pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
     450      4513259 :         if value > MAX_VALUE {
     451            0 :             return Err(DiskBtreeError::AppendOverflow(value));
     452      4513259 :         }
     453      4513259 :         if let Some(last_key) = &self.last_key {
     454      4512695 :             if key <= last_key {
     455            2 :                 return Err(DiskBtreeError::UnsortedInput {
     456            2 :                     key: key.as_slice().into(),
     457            2 :                     last_key: last_key.as_slice().into(),
     458            2 :                 });
     459      4512693 :             }
     460          564 :         }
     461      4513257 :         self.last_key = Some(*key);
     462      4513257 : 
     463      4513257 :         self.append_internal(key, Value::from_u64(value))
     464      4513259 :     }
     465              : 
     466      4521622 :     fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
     467      4521622 :         // Try to append to the current leaf buffer
     468      4521622 :         let last = self
     469      4521622 :             .stack
     470      4521622 :             .last_mut()
     471      4521622 :             .expect("should always have at least one item");
     472      4521622 :         let level = last.level;
     473      4521622 :         if last.push(key, value) {
     474      4505602 :             return Ok(());
     475        16020 :         }
     476        16020 : 
     477        16020 :         // It did not fit. Try to compress, and if it succeeds to make
     478        16020 :         // some room on the node, try appending to it again.
     479        16020 :         #[allow(clippy::collapsible_if)]
     480        16020 :         if last.compress() {
     481         8099 :             if last.push(key, value) {
     482         8091 :                 return Ok(());
     483            8 :             }
     484         7921 :         }
     485              : 
     486              :         // Could not append to the current leaf. Flush it and create a new one.
     487         7929 :         self.flush_node()?;
     488              : 
     489              :         // Replace the node we flushed with an empty one and append the new
     490              :         // key to it.
     491         7929 :         let mut last = BuildNode::new(level);
     492         7929 :         if !last.push(key, value) {
     493            0 :             return Err(DiskBtreeError::FailedToPushToNewLeafNode);
     494         7929 :         }
     495         7929 : 
     496         7929 :         self.stack.push(last);
     497         7929 : 
     498         7929 :         Ok(())
     499      4521622 :     }
     500              : 
     501              :     /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
     502              :     /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
     503              :     /// creates a new root page, increasing the height of the tree.
     504         8365 :     fn flush_node(&mut self) -> Result<()> {
     505         8365 :         // Get the current bottommost node in the stack and flush it to disk.
     506         8365 :         let last = self
     507         8365 :             .stack
     508         8365 :             .pop()
     509         8365 :             .expect("should always have at least one item");
     510         8365 :         let buf = last.pack();
     511         8365 :         let downlink_key = last.first_key();
     512         8365 :         let downlink_ptr = self.writer.write_blk(buf)?;
     513              : 
     514              :         // Append the downlink to the parent. If there is no parent, ie. this was the root page,
     515              :         // create a new root page, increasing the height of the tree.
     516         8365 :         if self.stack.is_empty() {
     517          436 :             self.stack.push(BuildNode::new(last.level + 1));
     518         7929 :         }
     519         8365 :         self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
     520         8365 :     }
     521              : 
     522              :     ///
     523              :     /// Flushes everything to disk, and returns the block number of the root page.
     524              :     /// The caller must store the root block number "out-of-band", and pass it
     525              :     /// to the DiskBtreeReader::new() when you want to read the tree again.
     526              :     /// (In the image and delta layers, it is stored in the beginning of the file,
     527              :     /// in the summary header)
     528              :     ///
     529          562 :     pub fn finish(mut self) -> Result<(u32, W)> {
     530              :         // flush all levels, except the root.
     531          998 :         while self.stack.len() > 1 {
     532          436 :             self.flush_node()?;
     533              :         }
     534              : 
     535          562 :         let root = self
     536          562 :             .stack
     537          562 :             .first()
     538          562 :             .expect("by the check above we left one item there");
     539          562 :         let buf = root.pack();
     540          562 :         let root_blknum = self.writer.write_blk(buf)?;
     541              : 
     542          562 :         Ok((root_blknum, self.writer))
     543          562 :     }
     544              : 
     545      2009970 :     pub fn borrow_writer(&self) -> &W {
     546      2009970 :         &self.writer
     547      2009970 :     }
     548              : }
     549              : 
     550              : ///
     551              : /// BuildNode represesnts an incomplete page that we are appending to.
     552              : ///
     553            0 : #[derive(Clone, Debug)]
     554              : struct BuildNode<const L: usize> {
     555              :     num_children: u16,
     556              :     level: u8,
     557              :     prefix: Vec<u8>,
     558              :     suffix_len: usize,
     559              : 
     560              :     keys: Vec<u8>,
     561              :     values: Vec<u8>,
     562              : 
     563              :     size: usize, // physical size of this node, if it was written to disk like this
     564              : }
     565              : 
     566              : const NODE_SIZE: usize = PAGE_SZ;
     567              : 
     568              : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
     569              : 
     570              : impl<const L: usize> BuildNode<L> {
     571         8929 :     fn new(level: u8) -> Self {
     572         8929 :         BuildNode {
     573         8929 :             num_children: 0,
     574         8929 :             level,
     575         8929 :             prefix: Vec::new(),
     576         8929 :             suffix_len: 0,
     577         8929 :             keys: Vec::new(),
     578         8929 :             values: Vec::new(),
     579         8929 :             size: NODE_HDR_SIZE,
     580         8929 :         }
     581         8929 :     }
     582              : 
     583              :     /// Try to append a key-value pair to this node. Returns 'true' on
     584              :     /// success, 'false' if the page was full or the key was
     585              :     /// incompatible with the prefix of the existing keys.
     586      4537650 :     fn push(&mut self, key: &[u8; L], value: Value) -> bool {
     587      4537650 :         // If we have already performed prefix-compression on the page,
     588      4537650 :         // check that the incoming key has the same prefix.
     589      4537650 :         if self.num_children > 0 {
     590              :             // does the prefix allow it?
     591      4528721 :             if !key.starts_with(&self.prefix) {
     592          195 :                 return false;
     593      4528526 :             }
     594         8929 :         } else {
     595         8929 :             self.suffix_len = key.len();
     596         8929 :         }
     597              : 
     598              :         // Is the node too full?
     599      4537455 :         if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
     600        15833 :             return false;
     601      4521622 :         }
     602      4521622 : 
     603      4521622 :         // All clear
     604      4521622 :         self.num_children += 1;
     605      4521622 :         self.keys.extend(&key[self.prefix.len()..]);
     606      4521622 :         self.values.extend(value.0);
     607      4521622 : 
     608      4521622 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     609      4521622 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     610              : 
     611      4521622 :         self.size += self.suffix_len + VALUE_SZ;
     612      4521622 : 
     613      4521622 :         true
     614      4537650 :     }
     615              : 
     616              :     ///
     617              :     /// Perform prefix-compression.
     618              :     ///
     619              :     /// Returns 'true' on success, 'false' if no compression was possible.
     620              :     ///
     621        16020 :     fn compress(&mut self) -> bool {
     622        16020 :         let first_suffix = self.first_suffix();
     623        16020 :         let last_suffix = self.last_suffix();
     624        16020 : 
     625        16020 :         // Find the common prefix among all keys
     626        16020 :         let mut prefix_len = 0;
     627       145275 :         while prefix_len < self.suffix_len {
     628       145275 :             if first_suffix[prefix_len] != last_suffix[prefix_len] {
     629        16020 :                 break;
     630       129255 :             }
     631       129255 :             prefix_len += 1;
     632              :         }
     633        16020 :         if prefix_len == 0 {
     634         7921 :             return false;
     635         8099 :         }
     636         8099 : 
     637         8099 :         // Can compress. Rewrite the keys without the common prefix.
     638         8099 :         self.prefix.extend(&self.keys[..prefix_len]);
     639         8099 : 
     640         8099 :         let mut new_keys = Vec::new();
     641         8099 :         let mut key_off = 0;
     642      2171590 :         while key_off < self.keys.len() {
     643      2163491 :             let next_key_off = key_off + self.suffix_len;
     644      2163491 :             new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
     645      2163491 :             key_off = next_key_off;
     646      2163491 :         }
     647         8099 :         self.keys = new_keys;
     648         8099 :         self.suffix_len -= prefix_len;
     649         8099 : 
     650         8099 :         self.size -= prefix_len * self.num_children as usize;
     651         8099 :         self.size += prefix_len;
     652         8099 : 
     653         8099 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     654         8099 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     655              : 
     656         8099 :         true
     657        16020 :     }
     658              : 
     659              :     ///
     660              :     /// Serialize the node to on-disk format.
     661              :     ///
     662         8927 :     fn pack(&self) -> Bytes {
     663         8927 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     664         8927 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     665         8927 :         assert!(self.num_children > 0);
     666              : 
     667         8927 :         let mut buf = BytesMut::new();
     668         8927 : 
     669         8927 :         buf.put_u16(self.num_children);
     670         8927 :         buf.put_u8(self.level);
     671         8927 :         buf.put_u8(self.prefix.len() as u8);
     672         8927 :         buf.put_u8(self.suffix_len as u8);
     673         8927 :         buf.put(&self.prefix[..]);
     674         8927 :         buf.put(&self.keys[..]);
     675         8927 :         buf.put(&self.values[..]);
     676         8927 : 
     677         8927 :         assert!(buf.len() == self.size);
     678              : 
     679         8927 :         assert!(buf.len() <= PAGE_SZ);
     680         8927 :         buf.resize(PAGE_SZ, 0);
     681         8927 :         buf.freeze()
     682         8927 :     }
     683              : 
     684        24385 :     fn first_suffix(&self) -> &[u8] {
     685        24385 :         &self.keys[..self.suffix_len]
     686        24385 :     }
     687        16020 :     fn last_suffix(&self) -> &[u8] {
     688        16020 :         &self.keys[self.keys.len() - self.suffix_len..]
     689        16020 :     }
     690              : 
     691              :     /// Return the full first key of the page, including the prefix
     692         8365 :     fn first_key(&self) -> [u8; L] {
     693         8365 :         let mut key = [0u8; L];
     694         8365 :         key[..self.prefix.len()].copy_from_slice(&self.prefix);
     695         8365 :         key[self.prefix.len()..].copy_from_slice(self.first_suffix());
     696         8365 :         key
     697         8365 :     }
     698              : }
     699              : 
     700              : #[cfg(test)]
     701              : pub(crate) mod tests {
     702              :     use super::*;
     703              :     use crate::context::DownloadBehavior;
     704              :     use crate::task_mgr::TaskKind;
     705              :     use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
     706              :     use rand::Rng;
     707              :     use std::collections::BTreeMap;
     708              :     use std::sync::atomic::{AtomicUsize, Ordering};
     709              : 
     710           10 :     #[derive(Clone, Default)]
     711              :     pub(crate) struct TestDisk {
     712              :         blocks: Vec<Bytes>,
     713              :     }
     714              :     impl TestDisk {
     715           10 :         fn new() -> Self {
     716           10 :             Self::default()
     717           10 :         }
     718      1016504 :         pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
     719      1016504 :             let mut buf = [0u8; PAGE_SZ];
     720      1016504 :             buf.copy_from_slice(&self.blocks[blknum as usize]);
     721      1016504 :             Ok(std::sync::Arc::new(buf).into())
     722      1016504 :         }
     723              :     }
     724              :     impl BlockReader for TestDisk {
     725       411285 :         fn block_cursor(&self) -> BlockCursor<'_> {
     726       411285 :             BlockCursor::new(BlockReaderRef::TestDisk(self))
     727       411285 :         }
     728              :     }
     729              :     impl BlockWriter for &mut TestDisk {
     730          214 :         fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
     731          214 :             let blknum = self.blocks.len();
     732          214 :             self.blocks.push(buf);
     733          214 :             Ok(blknum as u32)
     734          214 :         }
     735              :     }
     736              : 
     737            2 :     #[tokio::test]
     738            2 :     async fn basic() -> Result<()> {
     739            2 :         let mut disk = TestDisk::new();
     740            2 :         let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
     741            2 : 
     742            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     743            2 : 
     744            2 :         let all_keys: Vec<&[u8; 6]> = vec![
     745            2 :             b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
     746            2 :         ];
     747            2 :         let all_data: Vec<(&[u8; 6], u64)> = all_keys
     748            2 :             .iter()
     749            2 :             .enumerate()
     750           16 :             .map(|(idx, key)| (*key, idx as u64))
     751            2 :             .collect();
     752           16 :         for (key, val) in all_data.iter() {
     753           16 :             writer.append(key, *val)?;
     754            2 :         }
     755            2 : 
     756            2 :         let (root_offset, _writer) = writer.finish()?;
     757            2 : 
     758            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     759            2 : 
     760            2 :         reader.dump().await?;
     761            2 : 
     762            2 :         // Test the `get` function on all the keys.
     763           16 :         for (key, val) in all_data.iter() {
     764           16 :             assert_eq!(reader.get(key, &ctx).await?, Some(*val));
     765            2 :         }
     766            2 :         // And on some keys that don't exist
     767            2 :         assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
     768            2 :         assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
     769            2 :         assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
     770            2 : 
     771            2 :         // Test search with `visit` function
     772            2 :         let search_key = b"xabaaa";
     773            2 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     774            2 :             .iter()
     775           16 :             .filter(|(key, _value)| key[..] >= search_key[..])
     776           10 :             .map(|(key, value)| (key.to_vec(), *value))
     777            2 :             .collect();
     778            2 : 
     779            2 :         let mut data = Vec::new();
     780            2 :         reader
     781            2 :             .visit(
     782            2 :                 search_key,
     783            2 :                 VisitDirection::Forwards,
     784           10 :                 |key, value| {
     785           10 :                     data.push((key.to_vec(), value));
     786           10 :                     true
     787           10 :                 },
     788            2 :                 &ctx,
     789            2 :             )
     790            2 :             .await?;
     791            2 :         assert_eq!(data, expected);
     792            2 : 
     793            2 :         // Test a backwards scan
     794            2 :         let mut expected: Vec<(Vec<u8>, u64)> = all_data
     795            2 :             .iter()
     796           16 :             .filter(|(key, _value)| key[..] <= search_key[..])
     797            8 :             .map(|(key, value)| (key.to_vec(), *value))
     798            2 :             .collect();
     799            2 :         expected.reverse();
     800            2 :         let mut data = Vec::new();
     801            2 :         reader
     802            2 :             .visit(
     803            2 :                 search_key,
     804            2 :                 VisitDirection::Backwards,
     805            8 :                 |key, value| {
     806            8 :                     data.push((key.to_vec(), value));
     807            8 :                     true
     808            8 :                 },
     809            2 :                 &ctx,
     810            2 :             )
     811            2 :             .await?;
     812            2 :         assert_eq!(data, expected);
     813            2 : 
     814            2 :         // Backward scan where nothing matches
     815            2 :         reader
     816            2 :             .visit(
     817            2 :                 b"aaaaaa",
     818            2 :                 VisitDirection::Backwards,
     819            2 :                 |key, value| {
     820            0 :                     panic!("found unexpected key {}: {}", hex::encode(key), value);
     821            2 :                 },
     822            2 :                 &ctx,
     823            2 :             )
     824            2 :             .await?;
     825            2 : 
     826            2 :         // Full scan
     827            2 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     828            2 :             .iter()
     829           16 :             .map(|(key, value)| (key.to_vec(), *value))
     830            2 :             .collect();
     831            2 :         let mut data = Vec::new();
     832            2 :         reader
     833            2 :             .visit(
     834            2 :                 &[0u8; 6],
     835            2 :                 VisitDirection::Forwards,
     836           16 :                 |key, value| {
     837           16 :                     data.push((key.to_vec(), value));
     838           16 :                     true
     839           16 :                 },
     840            2 :                 &ctx,
     841            2 :             )
     842            2 :             .await?;
     843            2 :         assert_eq!(data, expected);
     844            2 : 
     845            2 :         Ok(())
     846            2 :     }
     847              : 
     848            2 :     #[tokio::test]
     849            2 :     async fn lots_of_keys() -> Result<()> {
     850            2 :         let mut disk = TestDisk::new();
     851            2 :         let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
     852            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     853            2 : 
     854            2 :         const NUM_KEYS: u64 = 1000;
     855            2 : 
     856            2 :         let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
     857            2 : 
     858         2002 :         for idx in 0..NUM_KEYS {
     859         2000 :             let key_int: u64 = 1 + idx * 2;
     860         2000 :             let key = u64::to_be_bytes(key_int);
     861         2000 :             writer.append(&key, idx)?;
     862            2 : 
     863         2000 :             all_data.insert(key_int, idx);
     864            2 :         }
     865            2 : 
     866            2 :         let (root_offset, _writer) = writer.finish()?;
     867            2 : 
     868            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     869            2 : 
     870            2 :         reader.dump().await?;
     871            2 : 
     872            2 :         use std::sync::Mutex;
     873            2 : 
     874            2 :         let result = Mutex::new(Vec::new());
     875            2 :         let limit: AtomicUsize = AtomicUsize::new(10);
     876        83820 :         let take_ten = |key: &[u8], value: u64| {
     877        83820 :             let mut keybuf = [0u8; 8];
     878        83820 :             keybuf.copy_from_slice(key);
     879        83820 :             let key_int = u64::from_be_bytes(keybuf);
     880        83820 : 
     881        83820 :             let mut result = result.lock().unwrap();
     882        83820 :             result.push((key_int, value));
     883        83820 : 
     884        83820 :             // keep going until we have 10 matches
     885        83820 :             result.len() < limit.load(Ordering::Relaxed)
     886        83820 :         };
     887            2 : 
     888         4020 :         for search_key_int in 0..(NUM_KEYS * 2 + 10) {
     889         4020 :             let search_key = u64::to_be_bytes(search_key_int);
     890         4020 :             assert_eq!(
     891         4020 :                 reader.get(&search_key, &ctx).await?,
     892         4020 :                 all_data.get(&search_key_int).cloned()
     893            2 :             );
     894            2 : 
     895            2 :             // Test a forward scan starting with this key
     896         4020 :             result.lock().unwrap().clear();
     897         4020 :             reader
     898         4020 :                 .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
     899            2 :                 .await?;
     900         4020 :             let expected = all_data
     901         4020 :                 .range(search_key_int..)
     902         4020 :                 .take(10)
     903        39820 :                 .map(|(&key, &val)| (key, val))
     904         4020 :                 .collect::<Vec<(u64, u64)>>();
     905         4020 :             assert_eq!(*result.lock().unwrap(), expected);
     906            2 : 
     907            2 :             // And a backwards scan
     908         4020 :             result.lock().unwrap().clear();
     909         4020 :             reader
     910         4020 :                 .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
     911            2 :                 .await?;
     912         4020 :             let expected = all_data
     913         4020 :                 .range(..=search_key_int)
     914         4020 :                 .rev()
     915         4020 :                 .take(10)
     916        40000 :                 .map(|(&key, &val)| (key, val))
     917         4020 :                 .collect::<Vec<(u64, u64)>>();
     918         4020 :             assert_eq!(*result.lock().unwrap(), expected);
     919            2 :         }
     920            2 : 
     921            2 :         // full scan
     922            2 :         let search_key = u64::to_be_bytes(0);
     923            2 :         limit.store(usize::MAX, Ordering::Relaxed);
     924            2 :         result.lock().unwrap().clear();
     925            2 :         reader
     926            2 :             .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
     927            2 :             .await?;
     928            2 :         let expected = all_data
     929            2 :             .iter()
     930         2000 :             .map(|(&key, &val)| (key, val))
     931            2 :             .collect::<Vec<(u64, u64)>>();
     932            2 :         assert_eq!(*result.lock().unwrap(), expected);
     933            2 : 
     934            2 :         // full scan
     935            2 :         let search_key = u64::to_be_bytes(u64::MAX);
     936            2 :         limit.store(usize::MAX, Ordering::Relaxed);
     937            2 :         result.lock().unwrap().clear();
     938            2 :         reader
     939            2 :             .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
     940            2 :             .await?;
     941            2 :         let expected = all_data
     942            2 :             .iter()
     943            2 :             .rev()
     944         2000 :             .map(|(&key, &val)| (key, val))
     945            2 :             .collect::<Vec<(u64, u64)>>();
     946            2 :         assert_eq!(*result.lock().unwrap(), expected);
     947            2 : 
     948            2 :         Ok(())
     949            2 :     }
     950              : 
     951            2 :     #[tokio::test]
     952            2 :     async fn random_data() -> Result<()> {
     953            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     954            2 : 
     955            2 :         // Generate random keys with exponential distribution, to
     956            2 :         // exercise the prefix compression
     957            2 :         const NUM_KEYS: usize = 100000;
     958            2 :         let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
     959       200002 :         for idx in 0..NUM_KEYS {
     960       200000 :             let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
     961       200000 :             let t = -(f64::ln(u));
     962       200000 :             let key_int = (t * 1000000.0) as u128;
     963       200000 : 
     964       200000 :             all_data.insert(key_int, idx as u64);
     965       200000 :         }
     966            2 : 
     967            2 :         // Build a tree from it
     968            2 :         let mut disk = TestDisk::new();
     969            2 :         let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
     970            2 : 
     971       195179 :         for (&key, &val) in all_data.iter() {
     972       195179 :             writer.append(&u128::to_be_bytes(key), val)?;
     973            2 :         }
     974            2 :         let (root_offset, _writer) = writer.finish()?;
     975            2 : 
     976            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     977            2 : 
     978            2 :         // Test get() operation on all the keys
     979       195179 :         for (&key, &val) in all_data.iter() {
     980       195179 :             let search_key = u128::to_be_bytes(key);
     981       195179 :             assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
     982            2 :         }
     983            2 : 
     984            2 :         // Test get() operations on random keys, most of which will not exist
     985       200002 :         for _ in 0..100000 {
     986       200000 :             let key_int = rand::thread_rng().gen::<u128>();
     987       200000 :             let search_key = u128::to_be_bytes(key_int);
     988       200000 :             assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
     989            2 :         }
     990            2 : 
     991            2 :         // Test boundary cases
     992            2 :         assert!(
     993            2 :             reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
     994            2 :                 == all_data.get(&u128::MIN).cloned()
     995            2 :         );
     996            2 :         assert!(
     997            2 :             reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
     998            2 :                 == all_data.get(&u128::MAX).cloned()
     999            2 :         );
    1000            2 : 
    1001            2 :         Ok(())
    1002            2 :     }
    1003              : 
    1004            2 :     #[test]
    1005            2 :     fn unsorted_input() {
    1006            2 :         let mut disk = TestDisk::new();
    1007            2 :         let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
    1008            2 : 
    1009            2 :         let _ = writer.append(b"ba", 1);
    1010            2 :         let _ = writer.append(b"bb", 2);
    1011            2 :         let err = writer.append(b"aa", 3).expect_err("should've failed");
    1012            2 :         match err {
    1013            2 :             DiskBtreeError::UnsortedInput { key, last_key } => {
    1014            2 :                 assert_eq!(key.as_ref(), b"aa".as_slice());
    1015            2 :                 assert_eq!(last_key.as_ref(), b"bb".as_slice());
    1016              :             }
    1017            0 :             _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
    1018              :         }
    1019            2 :     }
    1020              : 
    1021              :     ///
    1022              :     /// This test contains a particular data set, see disk_btree_test_data.rs
    1023              :     ///
    1024            2 :     #[tokio::test]
    1025            2 :     async fn particular_data() -> Result<()> {
    1026            2 :         // Build a tree from it
    1027            2 :         let mut disk = TestDisk::new();
    1028            2 :         let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
    1029            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1030            2 : 
    1031         4002 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1032         4000 :             writer.append(&key, val)?;
    1033            2 :         }
    1034            2 :         let (root_offset, writer) = writer.finish()?;
    1035            2 : 
    1036            2 :         println!("SIZE: {} blocks", writer.blocks.len());
    1037            2 : 
    1038            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1039            2 : 
    1040            2 :         // Test get() operation on all the keys
    1041         4002 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1042         4000 :             assert_eq!(reader.get(&key, &ctx).await?, Some(val));
    1043            2 :         }
    1044            2 : 
    1045            2 :         // Test full scan
    1046            2 :         let mut count = 0;
    1047            2 :         reader
    1048            2 :             .visit(
    1049            2 :                 &[0u8; 26],
    1050            2 :                 VisitDirection::Forwards,
    1051         4000 :                 |_key, _value| {
    1052         4000 :                     count += 1;
    1053         4000 :                     true
    1054         4000 :                 },
    1055            2 :                 &ctx,
    1056            2 :             )
    1057            2 :             .await?;
    1058            2 :         assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
    1059            2 : 
    1060            2 :         reader.dump().await?;
    1061            2 : 
    1062            2 :         Ok(())
    1063            2 :     }
    1064              : }
    1065              : 
    1066              : #[cfg(test)]
    1067              : #[path = "disk_btree_test_data.rs"]
    1068              : mod disk_btree_test_data;
        

Generated by: LCOV version 2.1-beta