LCOV - code coverage report
Current view: top level - pageserver/src/tenant - disk_btree.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 99.2 % 847 840
Test Date: 2025-03-12 00:01:28 Functions: 84.8 % 204 173

            Line data    Source code
       1              : //!
       2              : //! Simple on-disk B-tree implementation
       3              : //!
       4              : //! This is used as the index structure within image and delta layers
       5              : //!
       6              : //! Features:
       7              : //! - Fixed-width keys
       8              : //! - Fixed-width values (VALUE_SZ)
       9              : //! - The tree is created in a bulk operation. Insert/deletion after creation
      10              : //!   is not supported
      11              : //! - page-oriented
      12              : //!
      13              : //! TODO:
      14              : //! - maybe something like an Adaptive Radix Tree would be more efficient?
      15              : //! - the values stored by image and delta layers are offsets into the file,
      16              : //!   and they are in monotonically increasing order. Prefix compression would
      17              : //!   be very useful for them, too.
      18              : //! - An Iterator interface would be more convenient for the callers than the
      19              : //!   'visit' function
      20              : //!
      21              : use std::cmp::Ordering;
      22              : use std::iter::Rev;
      23              : use std::ops::{Range, RangeInclusive};
      24              : use std::{io, result};
      25              : 
      26              : use async_stream::try_stream;
      27              : use byteorder::{BE, ReadBytesExt};
      28              : use bytes::{BufMut, Bytes, BytesMut};
      29              : use either::Either;
      30              : use futures::{Stream, StreamExt};
      31              : use hex;
      32              : use thiserror::Error;
      33              : use tracing::error;
      34              : 
      35              : use crate::context::RequestContext;
      36              : use crate::tenant::block_io::{BlockReader, BlockWriter};
      37              : 
      38              : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
      39              : pub const VALUE_SZ: usize = 5;
      40              : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
      41              : 
      42              : pub const PAGE_SZ: usize = 8192;
      43              : 
      44              : #[derive(Clone, Copy, Debug)]
      45              : struct Value([u8; VALUE_SZ]);
      46              : 
      47              : impl Value {
      48     14125408 :     fn from_slice(slice: &[u8]) -> Value {
      49     14125408 :         let mut b = [0u8; VALUE_SZ];
      50     14125408 :         b.copy_from_slice(slice);
      51     14125408 :         Value(b)
      52     14125408 :     }
      53              : 
      54     14438380 :     fn from_u64(x: u64) -> Value {
      55     14438380 :         assert!(x <= 0x007f_ffff_ffff);
      56     14438380 :         Value([
      57     14438380 :             (x >> 32) as u8,
      58     14438380 :             (x >> 24) as u8,
      59     14438380 :             (x >> 16) as u8,
      60     14438380 :             (x >> 8) as u8,
      61     14438380 :             x as u8,
      62     14438380 :         ])
      63     14438380 :     }
      64              : 
      65        25753 :     fn from_blknum(x: u32) -> Value {
      66        25753 :         Value([
      67        25753 :             0x80,
      68        25753 :             (x >> 24) as u8,
      69        25753 :             (x >> 16) as u8,
      70        25753 :             (x >> 8) as u8,
      71        25753 :             x as u8,
      72        25753 :         ])
      73        25753 :     }
      74              : 
      75              :     #[allow(dead_code)]
      76            0 :     fn is_offset(self) -> bool {
      77            0 :         self.0[0] & 0x80 != 0
      78            0 :     }
      79              : 
      80     12862195 :     fn to_u64(self) -> u64 {
      81     12862195 :         let b = &self.0;
      82     12862195 :         ((b[0] as u64) << 32)
      83     12862195 :             | ((b[1] as u64) << 24)
      84     12862195 :             | ((b[2] as u64) << 16)
      85     12862195 :             | ((b[3] as u64) << 8)
      86     12862195 :             | b[4] as u64
      87     12862195 :     }
      88              : 
      89      1251165 :     fn to_blknum(self) -> u32 {
      90      1251165 :         let b = &self.0;
      91      1251165 :         assert!(b[0] == 0x80);
      92      1251165 :         ((b[1] as u32) << 24) | ((b[2] as u32) << 16) | ((b[3] as u32) << 8) | b[4] as u32
      93      1251165 :     }
      94              : }
      95              : 
      96              : #[derive(Error, Debug)]
      97              : pub enum DiskBtreeError {
      98              :     #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
      99              :     AppendOverflow(u64),
     100              : 
     101              :     #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
     102              :     UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
     103              : 
     104              :     #[error("Could not push to new leaf node")]
     105              :     FailedToPushToNewLeafNode,
     106              : 
     107              :     #[error("IoError: {0}")]
     108              :     Io(#[from] io::Error),
     109              : }
     110              : 
     111              : pub type Result<T> = result::Result<T, DiskBtreeError>;
     112              : 
     113              : /// This is the on-disk representation.
     114              : struct OnDiskNode<'a, const L: usize> {
     115              :     // Fixed-width fields
     116              :     num_children: u16,
     117              :     level: u8,
     118              :     prefix_len: u8,
     119              :     suffix_len: u8,
     120              : 
     121              :     // Variable-length fields. These are stored on-disk after the fixed-width
     122              :     // fields, in this order. In the in-memory representation, these point to
     123              :     // the right parts in the page buffer.
     124              :     prefix: &'a [u8],
     125              :     keys: &'a [u8],
     126              :     values: &'a [u8],
     127              : }
     128              : 
     129              : impl<const L: usize> OnDiskNode<'_, L> {
     130              :     ///
     131              :     /// Interpret a PAGE_SZ page as a node.
     132              :     ///
     133      3004803 :     fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
     134      3004803 :         let mut cursor = std::io::Cursor::new(buf);
     135      3004803 :         let num_children = cursor.read_u16::<BE>()?;
     136      3004803 :         let level = cursor.read_u8()?;
     137      3004803 :         let prefix_len = cursor.read_u8()?;
     138      3004803 :         let suffix_len = cursor.read_u8()?;
     139              : 
     140      3004803 :         let mut off = cursor.position();
     141      3004803 :         let prefix_off = off as usize;
     142      3004803 :         off += prefix_len as u64;
     143      3004803 : 
     144      3004803 :         let keys_off = off as usize;
     145      3004803 :         let keys_len = num_children as usize * suffix_len as usize;
     146      3004803 :         off += keys_len as u64;
     147      3004803 : 
     148      3004803 :         let values_off = off as usize;
     149      3004803 :         let values_len = num_children as usize * VALUE_SZ;
     150      3004803 :         //off += values_len as u64;
     151      3004803 : 
     152      3004803 :         let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
     153      3004803 :         let keys = &buf[keys_off..keys_off + keys_len];
     154      3004803 :         let values = &buf[values_off..values_off + values_len];
     155      3004803 : 
     156      3004803 :         Ok(OnDiskNode {
     157      3004803 :             num_children,
     158      3004803 :             level,
     159      3004803 :             prefix_len,
     160      3004803 :             suffix_len,
     161      3004803 :             prefix,
     162      3004803 :             keys,
     163      3004803 :             values,
     164      3004803 :         })
     165      3004803 :     }
     166              : 
     167              :     ///
     168              :     /// Read a value at 'idx'
     169              :     ///
     170     14125408 :     fn value(&self, idx: usize) -> Value {
     171     14125408 :         let value_off = idx * VALUE_SZ;
     172     14125408 :         let value_slice = &self.values[value_off..value_off + VALUE_SZ];
     173     14125408 :         Value::from_slice(value_slice)
     174     14125408 :     }
     175              : 
     176      2557047 :     fn binary_search(
     177      2557047 :         &self,
     178      2557047 :         search_key: &[u8; L],
     179      2557047 :         keybuf: &mut [u8],
     180      2557047 :     ) -> result::Result<usize, usize> {
     181      2557047 :         let mut size = self.num_children as usize;
     182      2557047 :         let mut low = 0;
     183      2557047 :         let mut high = size;
     184     19336217 :         while low < high {
     185     17235811 :             let mid = low + size / 2;
     186     17235811 : 
     187     17235811 :             let key_off = mid * self.suffix_len as usize;
     188     17235811 :             let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
     189     17235811 :             // Does this match?
     190     17235811 :             keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
     191     17235811 : 
     192     17235811 :             let cmp = keybuf[..].cmp(search_key);
     193     17235811 : 
     194     17235811 :             if cmp == Ordering::Less {
     195     10959493 :                 low = mid + 1;
     196     10959493 :             } else if cmp == Ordering::Greater {
     197      5819677 :                 high = mid;
     198      5819677 :             } else {
     199       456641 :                 return Ok(mid);
     200              :             }
     201     16779170 :             size = high - low;
     202              :         }
     203      2100406 :         Err(low)
     204      2557047 :     }
     205              : }
     206              : 
     207              : ///
     208              : /// Public reader object, to search the tree.
     209              : ///
     210              : #[derive(Clone)]
     211              : pub struct DiskBtreeReader<R, const L: usize>
     212              : where
     213              :     R: BlockReader,
     214              : {
     215              :     start_blk: u32,
     216              :     root_blk: u32,
     217              :     reader: R,
     218              : }
     219              : 
     220              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     221              : pub enum VisitDirection {
     222              :     Forwards,
     223              :     Backwards,
     224              : }
     225              : 
     226              : impl<R, const L: usize> DiskBtreeReader<R, L>
     227              : where
     228              :     R: BlockReader,
     229              : {
     230       483078 :     pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
     231       483078 :         DiskBtreeReader {
     232       483078 :             start_blk,
     233       483078 :             root_blk,
     234       483078 :             reader,
     235       483078 :         }
     236       483078 :     }
     237              : 
     238              :     ///
     239              :     /// Read the value for given key. Returns the value, or None if it doesn't exist.
     240              :     ///
     241       806244 :     pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
     242       806244 :         let mut result: Option<u64> = None;
     243       806244 :         self.visit(
     244       806244 :             search_key,
     245       806244 :             VisitDirection::Forwards,
     246       806244 :             |key, value| {
     247       406196 :                 if key == search_key {
     248       402185 :                     result = Some(value);
     249       402185 :                 }
     250       406196 :                 false
     251       806244 :             },
     252       806244 :             ctx,
     253       806244 :         )
     254       806244 :         .await?;
     255       806244 :         Ok(result)
     256       806244 :     }
     257              : 
     258         1396 :     pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
     259         1396 :     where
     260         1396 :         R: 'a + Send,
     261         1396 :     {
     262         1396 :         DiskBtreeIterator {
     263         1396 :             stream: Box::pin(self.into_stream(start_key, ctx)),
     264         1396 :         }
     265         1396 :     }
     266              : 
     267              :     /// Return a stream which yields all key, value pairs from the index
     268              :     /// starting from the first key greater or equal to `start_key`.
     269              :     ///
     270              :     /// Note 1: that this is a copy of [`Self::visit`].
     271              :     /// TODO: Once the sequential read path is removed this will become
     272              :     /// the only index traversal method.
     273              :     ///
     274              :     /// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
     275              :     /// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
     276              :     /// requires the reader to be present when the stream is used, and this creates a lifetime
     277              :     /// dependency between the reader and the stream. Now if we want to create an iterator that
     278              :     /// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
     279              :     /// to use from the image/delta layer APIs.
     280              :     ///
     281              :     /// Feel free to add the `&self` variant back if it's necessary.
     282       482734 :     pub fn into_stream<'a>(
     283       482734 :         self,
     284       482734 :         start_key: &'a [u8; L],
     285       482734 :         ctx: &'a RequestContext,
     286       482734 :     ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
     287       482734 :     where
     288       482734 :         R: 'a,
     289       482734 :     {
     290       482734 :         try_stream! {
     291       482734 :             let mut stack = Vec::new();
     292       482734 :             stack.push((self.root_blk, None));
     293       482734 :             let block_cursor = self.reader.block_cursor();
     294       482734 :             let mut node_buf = [0_u8; PAGE_SZ];
     295       482734 :             while let Some((node_blknum, opt_iter)) = stack.pop() {
     296       482734 :                 // Read the node, through the PS PageCache, into local variable `node_buf`.
     297       482734 :                 // We could keep the page cache read guard alive, but, at the time of writing,
     298       482734 :                 // we run quite small PS PageCache s => can't risk running out of
     299       482734 :                 // PageCache space because this stream isn't consumed fast enough.
     300       482734 :                 let page_read_guard = block_cursor
     301       482734 :                     .read_blk(self.start_blk + node_blknum, ctx)
     302       482734 :                     .await?;
     303       482734 :                 node_buf.copy_from_slice(page_read_guard.as_ref());
     304       482734 :                 drop(page_read_guard); // drop page cache read guard early
     305       482734 : 
     306       482734 :                 let node = OnDiskNode::deparse(&node_buf)?;
     307       482734 :                 let prefix_len = node.prefix_len as usize;
     308       482734 :                 let suffix_len = node.suffix_len as usize;
     309       482734 : 
     310       482734 :                 assert!(node.num_children > 0);
     311       482734 : 
     312       482734 :                 let mut keybuf = Vec::new();
     313       482734 :                 keybuf.extend(node.prefix);
     314       482734 :                 keybuf.resize(prefix_len + suffix_len, 0);
     315       482734 : 
     316       482734 :                 let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
     317       482734 :                     iter
     318       482734 :                 } else {
     319       482734 :                     // Locate the first match
     320       482734 :                     let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
     321       482734 :                         Ok(idx) => idx,
     322       482734 :                         Err(idx) => {
     323       482734 :                             if node.level == 0 {
     324       482734 :                                 // Imagine that the node contains the following keys:
     325       482734 :                                 //
     326       482734 :                                 // 1
     327       482734 :                                 // 3  <-- idx
     328       482734 :                                 // 5
     329       482734 :                                 //
     330       482734 :                                 // If the search key is '2' and there is exact match,
     331       482734 :                                 // the binary search would return the index of key
     332       482734 :                                 // '3'. That's cool, '3' is the first key to return.
     333       482734 :                                 idx
     334       482734 :                             } else {
     335       482734 :                                 // This is an internal page, so each key represents a lower
     336       482734 :                                 // bound for what's in the child page. If there is no exact
     337       482734 :                                 // match, we have to return the *previous* entry.
     338       482734 :                                 //
     339       482734 :                                 // 1  <-- return this
     340       482734 :                                 // 3  <-- idx
     341       482734 :                                 // 5
     342       482734 :                                 idx.saturating_sub(1)
     343       482734 :                             }
     344       482734 :                         }
     345       482734 :                     };
     346       482734 :                     Either::Left(idx..node.num_children.into())
     347       482734 :                 };
     348       482734 : 
     349       482734 : 
     350       482734 :                 // idx points to the first match now. Keep going from there
     351       482734 :                 while let Some(idx) = iter.next() {
     352       482734 :                     let key_off = idx * suffix_len;
     353       482734 :                     let suffix = &node.keys[key_off..key_off + suffix_len];
     354       482734 :                     keybuf[prefix_len..].copy_from_slice(suffix);
     355       482734 :                     let value = node.value(idx);
     356       482734 :                     #[allow(clippy::collapsible_if)]
     357       482734 :                     if node.level == 0 {
     358       482734 :                         // leaf
     359       482734 :                         yield (keybuf.clone(), value.to_u64());
     360       482734 :                     } else {
     361       482734 :                         stack.push((node_blknum, Some(iter)));
     362       482734 :                         stack.push((value.to_blknum(), None));
     363       482734 :                         break;
     364       482734 :                     }
     365       482734 :                 }
     366       482734 :             }
     367       482734 :         }
     368       482734 :     }
     369              : 
     370              :     ///
     371              :     /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
     372              :     /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
     373              :     /// backwards)
     374              :     ///
     375       823164 :     pub async fn visit<V>(
     376       823164 :         &self,
     377       823164 :         search_key: &[u8; L],
     378       823164 :         dir: VisitDirection,
     379       823164 :         mut visitor: V,
     380       823164 :         ctx: &RequestContext,
     381       823164 :     ) -> Result<bool>
     382       823164 :     where
     383       823164 :         V: FnMut(&[u8], u64) -> bool,
     384       823164 :     {
     385       823164 :         let mut stack = Vec::new();
     386       823164 :         stack.push((self.root_blk, None));
     387       823164 :         let block_cursor = self.reader.block_cursor();
     388      2437980 :         while let Some((node_blknum, opt_iter)) = stack.pop() {
     389              :             // Locate the node.
     390      2036912 :             let node_buf = block_cursor
     391      2036912 :                 .read_blk(self.start_blk + node_blknum, ctx)
     392      2036912 :                 .await?;
     393              : 
     394      2036912 :             let node = OnDiskNode::deparse(node_buf.as_ref())?;
     395      2036912 :             let prefix_len = node.prefix_len as usize;
     396      2036912 :             let suffix_len = node.suffix_len as usize;
     397      2036912 : 
     398      2036912 :             assert!(node.num_children > 0);
     399              : 
     400      2036912 :             let mut keybuf = Vec::new();
     401      2036912 :             keybuf.extend(node.prefix);
     402      2036912 :             keybuf.resize(prefix_len + suffix_len, 0);
     403              : 
     404      2036912 :             let mut iter = if let Some(iter) = opt_iter {
     405       407796 :                 iter
     406      1629116 :             } else if dir == VisitDirection::Forwards {
     407              :                 // Locate the first match
     408      1621064 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     409       406599 :                     Ok(idx) => idx,
     410      1214465 :                     Err(idx) => {
     411      1214465 :                         if node.level == 0 {
     412              :                             // Imagine that the node contains the following keys:
     413              :                             //
     414              :                             // 1
     415              :                             // 3  <-- idx
     416              :                             // 5
     417              :                             //
     418              :                             // If the search key is '2' and there is exact match,
     419              :                             // the binary search would return the index of key
     420              :                             // '3'. That's cool, '3' is the first key to return.
     421       416231 :                             idx
     422              :                         } else {
     423              :                             // This is an internal page, so each key represents a lower
     424              :                             // bound for what's in the child page. If there is no exact
     425              :                             // match, we have to return the *previous* entry.
     426              :                             //
     427              :                             // 1  <-- return this
     428              :                             // 3  <-- idx
     429              :                             // 5
     430       798234 :                             idx.saturating_sub(1)
     431              :                         }
     432              :                     }
     433              :                 };
     434      1621064 :                 Either::Left(idx..node.num_children.into())
     435              :             } else {
     436         8052 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     437         4004 :                     Ok(idx) => {
     438         4004 :                         // Exact match. That's the first entry to return, and walk
     439         4004 :                         // backwards from there.
     440         4004 :                         idx
     441              :                     }
     442         4048 :                     Err(idx) => {
     443              :                         // No exact match. The binary search returned the index of the
     444              :                         // first key that's > search_key. Back off by one, and walk
     445              :                         // backwards from there.
     446         4048 :                         if let Some(idx) = idx.checked_sub(1) {
     447         4040 :                             idx
     448              :                         } else {
     449            8 :                             return Ok(false);
     450              :                         }
     451              :                     }
     452              :                 };
     453         8044 :                 Either::Right((0..=idx).rev())
     454              :             };
     455              : 
     456              :             // idx points to the first match now. Keep going from there
     457      6324812 :             while let Some(idx) = iter.next() {
     458      5515948 :                 let key_off = idx * suffix_len;
     459      5515948 :                 let suffix = &node.keys[key_off..key_off + suffix_len];
     460      5515948 :                 keybuf[prefix_len..].copy_from_slice(suffix);
     461      5515948 :                 let value = node.value(idx);
     462      5515948 :                 #[allow(clippy::collapsible_if)]
     463      5515948 :                 if node.level == 0 {
     464              :                     // leaf
     465      4709996 :                     if !visitor(&keybuf, value.to_u64()) {
     466       422088 :                         return Ok(false);
     467      4287908 :                     }
     468              :                 } else {
     469       805952 :                     stack.push((node_blknum, Some(iter)));
     470       805952 :                     stack.push((value.to_blknum(), None));
     471       805952 :                     break;
     472              :                 }
     473              :             }
     474              :         }
     475       401068 :         Ok(true)
     476       823164 :     }
     477              : 
     478              :     #[allow(dead_code)]
     479           20 :     pub async fn dump(&self, ctx: &RequestContext) -> Result<()> {
     480           20 :         let mut stack = Vec::new();
     481           20 : 
     482           20 :         stack.push((self.root_blk, String::new(), 0, 0, 0));
     483           20 : 
     484           20 :         let block_cursor = self.reader.block_cursor();
     485              : 
     486        12084 :         while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
     487        12064 :             let blk = block_cursor.read_blk(self.start_blk + blknum, ctx).await?;
     488        12064 :             let buf: &[u8] = blk.as_ref();
     489        12064 :             let node = OnDiskNode::<L>::deparse(buf)?;
     490              : 
     491        12064 :             if child_idx == 0 {
     492           36 :                 print!("{:indent$}", "", indent = depth * 2);
     493           36 :                 let path_prefix = stack
     494           36 :                     .iter()
     495           36 :                     .map(|(_blknum, path, ..)| path.as_str())
     496           36 :                     .collect::<String>();
     497           36 :                 println!(
     498           36 :                     "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
     499           36 :                     hex::encode(node.prefix),
     500           36 :                     node.suffix_len
     501           36 :                 );
     502        12028 :             }
     503              : 
     504        12064 :             if child_idx + 1 < node.num_children {
     505        12028 :                 let key_off = key_off + node.suffix_len as usize;
     506        12028 :                 stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
     507        12028 :             }
     508        12064 :             let key = &node.keys[key_off..key_off + node.suffix_len as usize];
     509        12064 :             let val = node.value(child_idx as usize);
     510        12064 : 
     511        12064 :             print!("{:indent$}", "", indent = depth * 2 + 2);
     512        12064 :             println!("{}: {}", hex::encode(key), hex::encode(val.0));
     513        12064 : 
     514        12064 :             if node.level > 0 {
     515           16 :                 stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
     516        12048 :             }
     517              :         }
     518           20 :         Ok(())
     519           20 :     }
     520              : }
     521              : 
     522              : pub struct DiskBtreeIterator<'a> {
     523              :     #[allow(clippy::type_complexity)]
     524              :     stream: std::pin::Pin<
     525              :         Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
     526              :     >,
     527              : }
     528              : 
     529              : impl DiskBtreeIterator<'_> {
     530      4647472 :     pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
     531      4647472 :         self.stream.next().await
     532      4647472 :     }
     533              : }
     534              : 
     535              : ///
     536              : /// Public builder object, for creating a new tree.
     537              : ///
     538              : /// Usage: Create a builder object by calling 'new', load all the data into the
     539              : /// tree by calling 'append' for each key-value pair, and then call 'finish'
     540              : ///
     541              : /// 'L' is the key length in bytes
     542              : pub struct DiskBtreeBuilder<W, const L: usize>
     543              : where
     544              :     W: BlockWriter,
     545              : {
     546              :     writer: W,
     547              : 
     548              :     ///
     549              :     /// `stack[0]` is the current root page, `stack.last()` is the leaf.
     550              :     ///
     551              :     /// We maintain the length of the stack to be always greater than zero.
     552              :     /// Two exceptions are:
     553              :     /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
     554              :     ///    So because other methods cannot see the intermediate state invariant still holds.
     555              :     /// 2. `Self::finish`. It consumes self and does not return it back,
     556              :     ///    which means that this is where the structure is destroyed.
     557              :     ///    Thus stack of zero length cannot be observed by other methods.
     558              :     stack: Vec<BuildNode<L>>,
     559              : 
     560              :     /// Last key that was appended to the tree. Used to sanity check that append
     561              :     /// is called in increasing key order.
     562              :     last_key: Option<[u8; L]>,
     563              : }
     564              : 
     565              : impl<W, const L: usize> DiskBtreeBuilder<W, L>
     566              : where
     567              :     W: BlockWriter,
     568              : {
     569         4164 :     pub fn new(writer: W) -> Self {
     570         4164 :         DiskBtreeBuilder {
     571         4164 :             writer,
     572         4164 :             last_key: None,
     573         4164 :             stack: vec![BuildNode::new(0)],
     574         4164 :         }
     575         4164 :     }
     576              : 
     577     14438384 :     pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
     578     14438384 :         if value > MAX_VALUE {
     579            0 :             return Err(DiskBtreeError::AppendOverflow(value));
     580     14438384 :         }
     581     14438384 :         if let Some(last_key) = &self.last_key {
     582     14434652 :             if key <= last_key {
     583            4 :                 return Err(DiskBtreeError::UnsortedInput {
     584            4 :                     key: key.as_slice().into(),
     585            4 :                     last_key: last_key.as_slice().into(),
     586            4 :                 });
     587     14434648 :             }
     588         3732 :         }
     589     14438380 :         self.last_key = Some(*key);
     590     14438380 : 
     591     14438380 :         self.append_internal(key, Value::from_u64(value))
     592     14438384 :     }
     593              : 
     594     14464133 :     fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
     595     14464133 :         // Try to append to the current leaf buffer
     596     14464133 :         let last = self
     597     14464133 :             .stack
     598     14464133 :             .last_mut()
     599     14464133 :             .expect("should always have at least one item");
     600     14464133 :         let level = last.level;
     601     14464133 :         if last.push(key, value) {
     602     14415120 :             return Ok(());
     603        49013 :         }
     604        49013 : 
     605        49013 :         // It did not fit. Try to compress, and if it succeeds to make
     606        49013 :         // some room on the node, try appending to it again.
     607        49013 :         #[allow(clippy::collapsible_if)]
     608        49013 :         if last.compress() {
     609        24871 :             if last.push(key, value) {
     610        24856 :                 return Ok(());
     611           15 :             }
     612        24142 :         }
     613              : 
     614              :         // Could not append to the current leaf. Flush it and create a new one.
     615        24157 :         self.flush_node()?;
     616              : 
     617              :         // Replace the node we flushed with an empty one and append the new
     618              :         // key to it.
     619        24157 :         let mut last = BuildNode::new(level);
     620        24157 :         if !last.push(key, value) {
     621            0 :             return Err(DiskBtreeError::FailedToPushToNewLeafNode);
     622        24157 :         }
     623        24157 : 
     624        24157 :         self.stack.push(last);
     625        24157 : 
     626        24157 :         Ok(())
     627     14464133 :     }
     628              : 
     629              :     /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
     630              :     /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
     631              :     /// creates a new root page, increasing the height of the tree.
     632        25753 :     fn flush_node(&mut self) -> Result<()> {
     633        25753 :         // Get the current bottommost node in the stack and flush it to disk.
     634        25753 :         let last = self
     635        25753 :             .stack
     636        25753 :             .pop()
     637        25753 :             .expect("should always have at least one item");
     638        25753 :         let buf = last.pack();
     639        25753 :         let downlink_key = last.first_key();
     640        25753 :         let downlink_ptr = self.writer.write_blk(buf)?;
     641              : 
     642              :         // Append the downlink to the parent. If there is no parent, ie. this was the root page,
     643              :         // create a new root page, increasing the height of the tree.
     644        25753 :         if self.stack.is_empty() {
     645         1596 :             self.stack.push(BuildNode::new(last.level + 1));
     646        24157 :         }
     647        25753 :         self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
     648        25753 :     }
     649              : 
     650              :     ///
     651              :     /// Flushes everything to disk, and returns the block number of the root page.
     652              :     /// The caller must store the root block number "out-of-band", and pass it
     653              :     /// to the DiskBtreeReader::new() when you want to read the tree again.
     654              :     /// (In the image and delta layers, it is stored in the beginning of the file,
     655              :     /// in the summary header)
     656              :     ///
     657         3636 :     pub fn finish(mut self) -> Result<(u32, W)> {
     658              :         // flush all levels, except the root.
     659         5232 :         while self.stack.len() > 1 {
     660         1596 :             self.flush_node()?;
     661              :         }
     662              : 
     663         3636 :         let root = self
     664         3636 :             .stack
     665         3636 :             .first()
     666         3636 :             .expect("by the check above we left one item there");
     667         3636 :         let buf = root.pack();
     668         3636 :         let root_blknum = self.writer.write_blk(buf)?;
     669              : 
     670         3636 :         Ok((root_blknum, self.writer))
     671         3636 :     }
     672              : 
     673      4095304 :     pub fn borrow_writer(&self) -> &W {
     674      4095304 :         &self.writer
     675      4095304 :     }
     676              : }
     677              : 
     678              : ///
     679              : /// BuildNode represesnts an incomplete page that we are appending to.
     680              : ///
     681              : #[derive(Clone, Debug)]
     682              : struct BuildNode<const L: usize> {
     683              :     num_children: u16,
     684              :     level: u8,
     685              :     prefix: Vec<u8>,
     686              :     suffix_len: usize,
     687              : 
     688              :     keys: Vec<u8>,
     689              :     values: Vec<u8>,
     690              : 
     691              :     size: usize, // physical size of this node, if it was written to disk like this
     692              : }
     693              : 
     694              : const NODE_SIZE: usize = PAGE_SZ;
     695              : 
     696              : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
     697              : 
     698              : impl<const L: usize> BuildNode<L> {
     699        29917 :     fn new(level: u8) -> Self {
     700        29917 :         BuildNode {
     701        29917 :             num_children: 0,
     702        29917 :             level,
     703        29917 :             prefix: Vec::new(),
     704        29917 :             suffix_len: 0,
     705        29917 :             keys: Vec::new(),
     706        29917 :             values: Vec::new(),
     707        29917 :             size: NODE_HDR_SIZE,
     708        29917 :         }
     709        29917 :     }
     710              : 
     711              :     /// Try to append a key-value pair to this node. Returns 'true' on
     712              :     /// success, 'false' if the page was full or the key was
     713              :     /// incompatible with the prefix of the existing keys.
     714     14513161 :     fn push(&mut self, key: &[u8; L], value: Value) -> bool {
     715     14513161 :         // If we have already performed prefix-compression on the page,
     716     14513161 :         // check that the incoming key has the same prefix.
     717     14513161 :         if self.num_children > 0 {
     718              :             // does the prefix allow it?
     719     14483676 :             if !key.starts_with(&self.prefix) {
     720          417 :                 return false;
     721     14483259 :             }
     722        29485 :         } else {
     723        29485 :             self.suffix_len = key.len();
     724        29485 :         }
     725              : 
     726              :         // Is the node too full?
     727     14512744 :         if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
     728        48611 :             return false;
     729     14464133 :         }
     730     14464133 : 
     731     14464133 :         // All clear
     732     14464133 :         self.num_children += 1;
     733     14464133 :         self.keys.extend(&key[self.prefix.len()..]);
     734     14464133 :         self.values.extend(value.0);
     735     14464133 : 
     736     14464133 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     737     14464133 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     738              : 
     739     14464133 :         self.size += self.suffix_len + VALUE_SZ;
     740     14464133 : 
     741     14464133 :         true
     742     14513161 :     }
     743              : 
     744              :     ///
     745              :     /// Perform prefix-compression.
     746              :     ///
     747              :     /// Returns 'true' on success, 'false' if no compression was possible.
     748              :     ///
     749        49013 :     fn compress(&mut self) -> bool {
     750        49013 :         let first_suffix = self.first_suffix();
     751        49013 :         let last_suffix = self.last_suffix();
     752        49013 : 
     753        49013 :         // Find the common prefix among all keys
     754        49013 :         let mut prefix_len = 0;
     755       445683 :         while prefix_len < self.suffix_len {
     756       445683 :             if first_suffix[prefix_len] != last_suffix[prefix_len] {
     757        49013 :                 break;
     758       396670 :             }
     759       396670 :             prefix_len += 1;
     760              :         }
     761        49013 :         if prefix_len == 0 {
     762        24142 :             return false;
     763        24871 :         }
     764        24871 : 
     765        24871 :         // Can compress. Rewrite the keys without the common prefix.
     766        24871 :         self.prefix.extend(&self.keys[..prefix_len]);
     767        24871 : 
     768        24871 :         let mut new_keys = Vec::new();
     769        24871 :         let mut key_off = 0;
     770      6728021 :         while key_off < self.keys.len() {
     771      6703150 :             let next_key_off = key_off + self.suffix_len;
     772      6703150 :             new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
     773      6703150 :             key_off = next_key_off;
     774      6703150 :         }
     775        24871 :         self.keys = new_keys;
     776        24871 :         self.suffix_len -= prefix_len;
     777        24871 : 
     778        24871 :         self.size -= prefix_len * self.num_children as usize;
     779        24871 :         self.size += prefix_len;
     780        24871 : 
     781        24871 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     782        24871 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     783              : 
     784        24871 :         true
     785        49013 :     }
     786              : 
     787              :     ///
     788              :     /// Serialize the node to on-disk format.
     789              :     ///
     790        29389 :     fn pack(&self) -> Bytes {
     791        29389 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     792        29389 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     793        29389 :         assert!(self.num_children > 0);
     794              : 
     795        29389 :         let mut buf = BytesMut::new();
     796        29389 : 
     797        29389 :         buf.put_u16(self.num_children);
     798        29389 :         buf.put_u8(self.level);
     799        29389 :         buf.put_u8(self.prefix.len() as u8);
     800        29389 :         buf.put_u8(self.suffix_len as u8);
     801        29389 :         buf.put(&self.prefix[..]);
     802        29389 :         buf.put(&self.keys[..]);
     803        29389 :         buf.put(&self.values[..]);
     804        29389 : 
     805        29389 :         assert!(buf.len() == self.size);
     806              : 
     807        29389 :         assert!(buf.len() <= PAGE_SZ);
     808        29389 :         buf.resize(PAGE_SZ, 0);
     809        29389 :         buf.freeze()
     810        29389 :     }
     811              : 
     812        74766 :     fn first_suffix(&self) -> &[u8] {
     813        74766 :         &self.keys[..self.suffix_len]
     814        74766 :     }
     815        49013 :     fn last_suffix(&self) -> &[u8] {
     816        49013 :         &self.keys[self.keys.len() - self.suffix_len..]
     817        49013 :     }
     818              : 
     819              :     /// Return the full first key of the page, including the prefix
     820        25753 :     fn first_key(&self) -> [u8; L] {
     821        25753 :         let mut key = [0u8; L];
     822        25753 :         key[..self.prefix.len()].copy_from_slice(&self.prefix);
     823        25753 :         key[self.prefix.len()..].copy_from_slice(self.first_suffix());
     824        25753 :         key
     825        25753 :     }
     826              : }
     827              : 
     828              : #[cfg(test)]
     829              : pub(crate) mod tests {
     830              :     use std::collections::BTreeMap;
     831              :     use std::sync::atomic::{AtomicUsize, Ordering};
     832              : 
     833              :     use rand::Rng;
     834              : 
     835              :     use super::*;
     836              :     use crate::context::DownloadBehavior;
     837              :     use crate::task_mgr::TaskKind;
     838              :     use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
     839              : 
     840              :     #[derive(Clone, Default)]
     841              :     pub(crate) struct TestDisk {
     842              :         blocks: Vec<Bytes>,
     843              :     }
     844              :     impl TestDisk {
     845           20 :         fn new() -> Self {
     846           20 :             Self::default()
     847           20 :         }
     848      2033394 :         pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
     849      2033394 :             let mut buf = [0u8; PAGE_SZ];
     850      2033394 :             buf.copy_from_slice(&self.blocks[blknum as usize]);
     851      2033394 :             Ok(std::sync::Arc::new(buf).into())
     852      2033394 :         }
     853              :     }
     854              :     impl BlockReader for TestDisk {
     855       822376 :         fn block_cursor(&self) -> BlockCursor<'_> {
     856       822376 :             BlockCursor::new(BlockReaderRef::TestDisk(self))
     857       822376 :         }
     858              :     }
     859              :     impl BlockWriter for &mut TestDisk {
     860          429 :         fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
     861          429 :             let blknum = self.blocks.len();
     862          429 :             self.blocks.push(buf);
     863          429 :             Ok(blknum as u32)
     864          429 :         }
     865              :     }
     866              : 
     867              :     #[tokio::test]
     868            4 :     async fn basic() -> Result<()> {
     869            4 :         let mut disk = TestDisk::new();
     870            4 :         let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
     871            4 : 
     872            4 :         let ctx =
     873            4 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     874            4 : 
     875            4 :         let all_keys: Vec<&[u8; 6]> = vec![
     876            4 :             b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
     877            4 :         ];
     878            4 :         let all_data: Vec<(&[u8; 6], u64)> = all_keys
     879            4 :             .iter()
     880            4 :             .enumerate()
     881           32 :             .map(|(idx, key)| (*key, idx as u64))
     882            4 :             .collect();
     883           32 :         for (key, val) in all_data.iter() {
     884           32 :             writer.append(key, *val)?;
     885            4 :         }
     886            4 : 
     887            4 :         let (root_offset, _writer) = writer.finish()?;
     888            4 : 
     889            4 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     890            4 : 
     891            4 :         reader.dump(&ctx).await?;
     892            4 : 
     893            4 :         // Test the `get` function on all the keys.
     894           32 :         for (key, val) in all_data.iter() {
     895           32 :             assert_eq!(reader.get(key, &ctx).await?, Some(*val));
     896            4 :         }
     897            4 :         // And on some keys that don't exist
     898            4 :         assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
     899            4 :         assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
     900            4 :         assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
     901            4 : 
     902            4 :         // Test search with `visit` function
     903            4 :         let search_key = b"xabaaa";
     904            4 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     905            4 :             .iter()
     906           32 :             .filter(|(key, _value)| key[..] >= search_key[..])
     907           20 :             .map(|(key, value)| (key.to_vec(), *value))
     908            4 :             .collect();
     909            4 : 
     910            4 :         let mut data = Vec::new();
     911            4 :         reader
     912            4 :             .visit(
     913            4 :                 search_key,
     914            4 :                 VisitDirection::Forwards,
     915           20 :                 |key, value| {
     916           20 :                     data.push((key.to_vec(), value));
     917           20 :                     true
     918           20 :                 },
     919            4 :                 &ctx,
     920            4 :             )
     921            4 :             .await?;
     922            4 :         assert_eq!(data, expected);
     923            4 : 
     924            4 :         // Test a backwards scan
     925            4 :         let mut expected: Vec<(Vec<u8>, u64)> = all_data
     926            4 :             .iter()
     927           32 :             .filter(|(key, _value)| key[..] <= search_key[..])
     928           16 :             .map(|(key, value)| (key.to_vec(), *value))
     929            4 :             .collect();
     930            4 :         expected.reverse();
     931            4 :         let mut data = Vec::new();
     932            4 :         reader
     933            4 :             .visit(
     934            4 :                 search_key,
     935            4 :                 VisitDirection::Backwards,
     936           16 :                 |key, value| {
     937           16 :                     data.push((key.to_vec(), value));
     938           16 :                     true
     939           16 :                 },
     940            4 :                 &ctx,
     941            4 :             )
     942            4 :             .await?;
     943            4 :         assert_eq!(data, expected);
     944            4 : 
     945            4 :         // Backward scan where nothing matches
     946            4 :         reader
     947            4 :             .visit(
     948            4 :                 b"aaaaaa",
     949            4 :                 VisitDirection::Backwards,
     950            4 :                 |key, value| {
     951            0 :                     panic!("found unexpected key {}: {}", hex::encode(key), value);
     952            4 :                 },
     953            4 :                 &ctx,
     954            4 :             )
     955            4 :             .await?;
     956            4 : 
     957            4 :         // Full scan
     958            4 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     959            4 :             .iter()
     960           32 :             .map(|(key, value)| (key.to_vec(), *value))
     961            4 :             .collect();
     962            4 :         let mut data = Vec::new();
     963            4 :         reader
     964            4 :             .visit(
     965            4 :                 &[0u8; 6],
     966            4 :                 VisitDirection::Forwards,
     967           32 :                 |key, value| {
     968           32 :                     data.push((key.to_vec(), value));
     969           32 :                     true
     970           32 :                 },
     971            4 :                 &ctx,
     972            4 :             )
     973            4 :             .await?;
     974            4 :         assert_eq!(data, expected);
     975            4 : 
     976            4 :         Ok(())
     977            4 :     }
     978              : 
     979              :     #[tokio::test]
     980            4 :     async fn lots_of_keys() -> Result<()> {
     981            4 :         let mut disk = TestDisk::new();
     982            4 :         let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
     983            4 :         let ctx =
     984            4 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     985            4 : 
     986            4 :         const NUM_KEYS: u64 = 1000;
     987            4 : 
     988            4 :         let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
     989            4 : 
     990         4004 :         for idx in 0..NUM_KEYS {
     991         4000 :             let key_int: u64 = 1 + idx * 2;
     992         4000 :             let key = u64::to_be_bytes(key_int);
     993         4000 :             writer.append(&key, idx)?;
     994            4 : 
     995         4000 :             all_data.insert(key_int, idx);
     996            4 :         }
     997            4 : 
     998            4 :         let (root_offset, _writer) = writer.finish()?;
     999            4 : 
    1000            4 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1001            4 : 
    1002            4 :         reader.dump(&ctx).await?;
    1003            4 : 
    1004            4 :         use std::sync::Mutex;
    1005            4 : 
    1006            4 :         let result = Mutex::new(Vec::new());
    1007            4 :         let limit: AtomicUsize = AtomicUsize::new(10);
    1008       167640 :         let take_ten = |key: &[u8], value: u64| {
    1009       167640 :             let mut keybuf = [0u8; 8];
    1010       167640 :             keybuf.copy_from_slice(key);
    1011       167640 :             let key_int = u64::from_be_bytes(keybuf);
    1012       167640 : 
    1013       167640 :             let mut result = result.lock().unwrap();
    1014       167640 :             result.push((key_int, value));
    1015       167640 : 
    1016       167640 :             // keep going until we have 10 matches
    1017       167640 :             result.len() < limit.load(Ordering::Relaxed)
    1018       167640 :         };
    1019            4 : 
    1020         8040 :         for search_key_int in 0..(NUM_KEYS * 2 + 10) {
    1021         8040 :             let search_key = u64::to_be_bytes(search_key_int);
    1022         8040 :             assert_eq!(
    1023         8040 :                 reader.get(&search_key, &ctx).await?,
    1024         8040 :                 all_data.get(&search_key_int).cloned()
    1025            4 :             );
    1026            4 : 
    1027            4 :             // Test a forward scan starting with this key
    1028         8040 :             result.lock().unwrap().clear();
    1029         8040 :             reader
    1030         8040 :                 .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1031         8040 :                 .await?;
    1032         8040 :             let expected = all_data
    1033         8040 :                 .range(search_key_int..)
    1034         8040 :                 .take(10)
    1035        79640 :                 .map(|(&key, &val)| (key, val))
    1036         8040 :                 .collect::<Vec<(u64, u64)>>();
    1037         8040 :             assert_eq!(*result.lock().unwrap(), expected);
    1038            4 : 
    1039            4 :             // And a backwards scan
    1040         8040 :             result.lock().unwrap().clear();
    1041         8040 :             reader
    1042         8040 :                 .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1043         8040 :                 .await?;
    1044         8040 :             let expected = all_data
    1045         8040 :                 .range(..=search_key_int)
    1046         8040 :                 .rev()
    1047         8040 :                 .take(10)
    1048        80000 :                 .map(|(&key, &val)| (key, val))
    1049         8040 :                 .collect::<Vec<(u64, u64)>>();
    1050         8040 :             assert_eq!(*result.lock().unwrap(), expected);
    1051            4 :         }
    1052            4 : 
    1053            4 :         // full scan
    1054            4 :         let search_key = u64::to_be_bytes(0);
    1055            4 :         limit.store(usize::MAX, Ordering::Relaxed);
    1056            4 :         result.lock().unwrap().clear();
    1057            4 :         reader
    1058            4 :             .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1059            4 :             .await?;
    1060            4 :         let expected = all_data
    1061            4 :             .iter()
    1062         4000 :             .map(|(&key, &val)| (key, val))
    1063            4 :             .collect::<Vec<(u64, u64)>>();
    1064            4 :         assert_eq!(*result.lock().unwrap(), expected);
    1065            4 : 
    1066            4 :         // full scan
    1067            4 :         let search_key = u64::to_be_bytes(u64::MAX);
    1068            4 :         limit.store(usize::MAX, Ordering::Relaxed);
    1069            4 :         result.lock().unwrap().clear();
    1070            4 :         reader
    1071            4 :             .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1072            4 :             .await?;
    1073            4 :         let expected = all_data
    1074            4 :             .iter()
    1075            4 :             .rev()
    1076         4000 :             .map(|(&key, &val)| (key, val))
    1077            4 :             .collect::<Vec<(u64, u64)>>();
    1078            4 :         assert_eq!(*result.lock().unwrap(), expected);
    1079            4 : 
    1080            4 :         Ok(())
    1081            4 :     }
    1082              : 
    1083              :     #[tokio::test]
    1084            4 :     async fn random_data() -> Result<()> {
    1085            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1086            4 : 
    1087            4 :         // Generate random keys with exponential distribution, to
    1088            4 :         // exercise the prefix compression
    1089            4 :         const NUM_KEYS: usize = 100000;
    1090            4 :         let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
    1091       400004 :         for idx in 0..NUM_KEYS {
    1092       400000 :             let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
    1093       400000 :             let t = -(f64::ln(u));
    1094       400000 :             let key_int = (t * 1000000.0) as u128;
    1095       400000 : 
    1096       400000 :             all_data.insert(key_int, idx as u64);
    1097       400000 :         }
    1098            4 : 
    1099            4 :         // Build a tree from it
    1100            4 :         let mut disk = TestDisk::new();
    1101            4 :         let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
    1102            4 : 
    1103       390152 :         for (&key, &val) in all_data.iter() {
    1104       390152 :             writer.append(&u128::to_be_bytes(key), val)?;
    1105            4 :         }
    1106            4 :         let (root_offset, _writer) = writer.finish()?;
    1107            4 : 
    1108            4 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1109            4 : 
    1110            4 :         // Test get() operation on all the keys
    1111       390152 :         for (&key, &val) in all_data.iter() {
    1112       390152 :             let search_key = u128::to_be_bytes(key);
    1113       390152 :             assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
    1114            4 :         }
    1115            4 : 
    1116            4 :         // Test get() operations on random keys, most of which will not exist
    1117       400004 :         for _ in 0..100000 {
    1118       400000 :             let key_int = rand::thread_rng().r#gen::<u128>();
    1119       400000 :             let search_key = u128::to_be_bytes(key_int);
    1120       400000 :             assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
    1121            4 :         }
    1122            4 : 
    1123            4 :         // Test boundary cases
    1124            4 :         assert!(
    1125            4 :             reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
    1126            4 :                 == all_data.get(&u128::MIN).cloned()
    1127            4 :         );
    1128            4 :         assert!(
    1129            4 :             reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
    1130            4 :                 == all_data.get(&u128::MAX).cloned()
    1131            4 :         );
    1132            4 : 
    1133            4 :         // Test iterator and get_stream API
    1134            4 :         let mut iter = reader.iter(&[0; 16], &ctx);
    1135            4 :         let mut cnt = 0;
    1136       390156 :         while let Some(res) = iter.next().await {
    1137       390152 :             let (key, val) = res?;
    1138       390152 :             let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
    1139       390152 :             assert_eq!(val, *all_data.get(&key).unwrap());
    1140       390152 :             cnt += 1;
    1141            4 :         }
    1142            4 :         assert_eq!(cnt, all_data.len());
    1143            4 : 
    1144            4 :         Ok(())
    1145            4 :     }
    1146              : 
    1147              :     #[test]
    1148            4 :     fn unsorted_input() {
    1149            4 :         let mut disk = TestDisk::new();
    1150            4 :         let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
    1151            4 : 
    1152            4 :         let _ = writer.append(b"ba", 1);
    1153            4 :         let _ = writer.append(b"bb", 2);
    1154            4 :         let err = writer.append(b"aa", 3).expect_err("should've failed");
    1155            4 :         match err {
    1156            4 :             DiskBtreeError::UnsortedInput { key, last_key } => {
    1157            4 :                 assert_eq!(key.as_ref(), b"aa".as_slice());
    1158            4 :                 assert_eq!(last_key.as_ref(), b"bb".as_slice());
    1159              :             }
    1160            0 :             _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
    1161              :         }
    1162            4 :     }
    1163              : 
    1164              :     ///
    1165              :     /// This test contains a particular data set, see disk_btree_test_data.rs
    1166              :     ///
    1167              :     #[tokio::test]
    1168            4 :     async fn particular_data() -> Result<()> {
    1169            4 :         // Build a tree from it
    1170            4 :         let mut disk = TestDisk::new();
    1171            4 :         let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
    1172            4 :         let ctx =
    1173            4 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1174            4 : 
    1175         8004 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1176         8000 :             writer.append(&key, val)?;
    1177            4 :         }
    1178            4 :         let (root_offset, writer) = writer.finish()?;
    1179            4 : 
    1180            4 :         println!("SIZE: {} blocks", writer.blocks.len());
    1181            4 : 
    1182            4 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1183            4 : 
    1184            4 :         // Test get() operation on all the keys
    1185         8004 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1186         8000 :             assert_eq!(reader.get(&key, &ctx).await?, Some(val));
    1187            4 :         }
    1188            4 : 
    1189            4 :         // Test full scan
    1190            4 :         let mut count = 0;
    1191            4 :         reader
    1192            4 :             .visit(
    1193            4 :                 &[0u8; 26],
    1194            4 :                 VisitDirection::Forwards,
    1195         8000 :                 |_key, _value| {
    1196         8000 :                     count += 1;
    1197         8000 :                     true
    1198         8000 :                 },
    1199            4 :                 &ctx,
    1200            4 :             )
    1201            4 :             .await?;
    1202            4 :         assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
    1203            4 : 
    1204            4 :         reader.dump(&ctx).await?;
    1205            4 : 
    1206            4 :         Ok(())
    1207            4 :     }
    1208              : }
    1209              : 
    1210              : #[cfg(test)]
    1211              : #[path = "disk_btree_test_data.rs"]
    1212              : mod disk_btree_test_data;
        

Generated by: LCOV version 2.1-beta