|             Line data    Source code 
       1              : //!
       2              : //! Simple on-disk B-tree implementation
       3              : //!
       4              : //! This is used as the index structure within image and delta layers
       5              : //!
       6              : //! Features:
       7              : //! - Fixed-width keys
       8              : //! - Fixed-width values (VALUE_SZ)
       9              : //! - The tree is created in a bulk operation. Insert/deletion after creation
      10              : //!   is not supported
      11              : //! - page-oriented
      12              : //!
      13              : //! TODO:
      14              : //! - maybe something like an Adaptive Radix Tree would be more efficient?
      15              : //! - the values stored by image and delta layers are offsets into the file,
      16              : //!   and they are in monotonically increasing order. Prefix compression would
      17              : //!   be very useful for them, too.
      18              : //! - An Iterator interface would be more convenient for the callers than the
      19              : //!   'visit' function
      20              : //!
      21              : use async_stream::try_stream;
      22              : use byteorder::{ReadBytesExt, BE};
      23              : use bytes::{BufMut, Bytes, BytesMut};
      24              : use either::Either;
      25              : use futures::{Stream, StreamExt};
      26              : use hex;
      27              : use std::{
      28              :     cmp::Ordering,
      29              :     io,
      30              :     iter::Rev,
      31              :     ops::{Range, RangeInclusive},
      32              :     result,
      33              : };
      34              : use thiserror::Error;
      35              : use tracing::error;
      36              : 
      37              : use crate::{
      38              :     context::{DownloadBehavior, RequestContext},
      39              :     task_mgr::TaskKind,
      40              :     tenant::block_io::{BlockReader, BlockWriter},
      41              : };
      42              : 
      43              : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
      44              : pub const VALUE_SZ: usize = 5;
      45              : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
      46              : 
      47              : pub const PAGE_SZ: usize = 8192;
      48              : 
      49              : #[derive(Clone, Copy, Debug)]
      50              : struct Value([u8; VALUE_SZ]);
      51              : 
      52              : impl Value {
      53      4674869 :     fn from_slice(slice: &[u8]) -> Value {
      54      4674869 :         let mut b = [0u8; VALUE_SZ];
      55      4674869 :         b.copy_from_slice(slice);
      56      4674869 :         Value(b)
      57      4674869 :     }
      58              : 
      59      7187463 :     fn from_u64(x: u64) -> Value {
      60      7187463 :         assert!(x <= 0x007f_ffff_ffff);
      61      7187463 :         Value([
      62      7187463 :             (x >> 32) as u8,
      63      7187463 :             (x >> 24) as u8,
      64      7187463 :             (x >> 16) as u8,
      65      7187463 :             (x >> 8) as u8,
      66      7187463 :             x as u8,
      67      7187463 :         ])
      68      7187463 :     }
      69              : 
      70        12859 :     fn from_blknum(x: u32) -> Value {
      71        12859 :         Value([
      72        12859 :             0x80,
      73        12859 :             (x >> 24) as u8,
      74        12859 :             (x >> 16) as u8,
      75        12859 :             (x >> 8) as u8,
      76        12859 :             x as u8,
      77        12859 :         ])
      78        12859 :     }
      79              : 
      80              :     #[allow(dead_code)]
      81            0 :     fn is_offset(self) -> bool {
      82            0 :         self.0[0] & 0x80 != 0
      83            0 :     }
      84              : 
      85      4025087 :     fn to_u64(self) -> u64 {
      86      4025087 :         let b = &self.0;
      87      4025087 :         (b[0] as u64) << 32
      88      4025087 :             | (b[1] as u64) << 24
      89      4025087 :             | (b[2] as u64) << 16
      90      4025087 :             | (b[3] as u64) << 8
      91      4025087 :             | b[4] as u64
      92      4025087 :     }
      93              : 
      94       643758 :     fn to_blknum(self) -> u32 {
      95       643758 :         let b = &self.0;
      96       643758 :         assert!(b[0] == 0x80);
      97       643758 :         (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
      98       643758 :     }
      99              : }
     100              : 
     101            0 : #[derive(Error, Debug)]
     102              : pub enum DiskBtreeError {
     103              :     #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
     104              :     AppendOverflow(u64),
     105              : 
     106              :     #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
     107              :     UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
     108              : 
     109              :     #[error("Could not push to new leaf node")]
     110              :     FailedToPushToNewLeafNode,
     111              : 
     112              :     #[error("IoError: {0}")]
     113              :     Io(#[from] io::Error),
     114              : }
     115              : 
     116              : pub type Result<T> = result::Result<T, DiskBtreeError>;
     117              : 
     118              : /// This is the on-disk representation.
     119              : struct OnDiskNode<'a, const L: usize> {
     120              :     // Fixed-width fields
     121              :     num_children: u16,
     122              :     level: u8,
     123              :     prefix_len: u8,
     124              :     suffix_len: u8,
     125              : 
     126              :     // Variable-length fields. These are stored on-disk after the fixed-width
     127              :     // fields, in this order. In the in-memory representation, these point to
     128              :     // the right parts in the page buffer.
     129              :     prefix: &'a [u8],
     130              :     keys: &'a [u8],
     131              :     values: &'a [u8],
     132              : }
     133              : 
     134              : impl<'a, const L: usize> OnDiskNode<'a, L> {
     135              :     ///
     136              :     /// Interpret a PAGE_SZ page as a node.
     137              :     ///
     138      1541967 :     fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
     139      1541967 :         let mut cursor = std::io::Cursor::new(buf);
     140      1541967 :         let num_children = cursor.read_u16::<BE>()?;
     141      1541967 :         let level = cursor.read_u8()?;
     142      1541967 :         let prefix_len = cursor.read_u8()?;
     143      1541967 :         let suffix_len = cursor.read_u8()?;
     144              : 
     145      1541967 :         let mut off = cursor.position();
     146      1541967 :         let prefix_off = off as usize;
     147      1541967 :         off += prefix_len as u64;
     148      1541967 : 
     149      1541967 :         let keys_off = off as usize;
     150      1541967 :         let keys_len = num_children as usize * suffix_len as usize;
     151      1541967 :         off += keys_len as u64;
     152      1541967 : 
     153      1541967 :         let values_off = off as usize;
     154      1541967 :         let values_len = num_children as usize * VALUE_SZ;
     155      1541967 :         //off += values_len as u64;
     156      1541967 : 
     157      1541967 :         let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
     158      1541967 :         let keys = &buf[keys_off..keys_off + keys_len];
     159      1541967 :         let values = &buf[values_off..values_off + values_len];
     160      1541967 : 
     161      1541967 :         Ok(OnDiskNode {
     162      1541967 :             num_children,
     163      1541967 :             level,
     164      1541967 :             prefix_len,
     165      1541967 :             suffix_len,
     166      1541967 :             prefix,
     167      1541967 :             keys,
     168      1541967 :             values,
     169      1541967 :         })
     170      1541967 :     }
     171              : 
     172              :     ///
     173              :     /// Read a value at 'idx'
     174              :     ///
     175      4674869 :     fn value(&self, idx: usize) -> Value {
     176      4674869 :         let value_off = idx * VALUE_SZ;
     177      4674869 :         let value_slice = &self.values[value_off..value_off + VALUE_SZ];
     178      4674869 :         Value::from_slice(value_slice)
     179      4674869 :     }
     180              : 
     181      1330638 :     fn binary_search(
     182      1330638 :         &self,
     183      1330638 :         search_key: &[u8; L],
     184      1330638 :         keybuf: &mut [u8],
     185      1330638 :     ) -> result::Result<usize, usize> {
     186      1330638 :         let mut size = self.num_children as usize;
     187      1330638 :         let mut low = 0;
     188      1330638 :         let mut high = size;
     189     10230832 :         while low < high {
     190      9112908 :             let mid = low + size / 2;
     191      9112908 : 
     192      9112908 :             let key_off = mid * self.suffix_len as usize;
     193      9112908 :             let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
     194      9112908 :             // Does this match?
     195      9112908 :             keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
     196      9112908 : 
     197      9112908 :             let cmp = keybuf[..].cmp(search_key);
     198      9112908 : 
     199      9112908 :             if cmp == Ordering::Less {
     200      5888814 :                 low = mid + 1;
     201      5888814 :             } else if cmp == Ordering::Greater {
     202      3011380 :                 high = mid;
     203      3011380 :             } else {
     204       212714 :                 return Ok(mid);
     205              :             }
     206      8900194 :             size = high - low;
     207              :         }
     208      1117924 :         Err(low)
     209      1330638 :     }
     210              : }
     211              : 
     212              : ///
     213              : /// Public reader object, to search the tree.
     214              : ///
     215              : pub struct DiskBtreeReader<R, const L: usize>
     216              : where
     217              :     R: BlockReader,
     218              : {
     219              :     start_blk: u32,
     220              :     root_blk: u32,
     221              :     reader: R,
     222              : }
     223              : 
     224              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     225              : pub enum VisitDirection {
     226              :     Forwards,
     227              :     Backwards,
     228              : }
     229              : 
     230              : impl<R, const L: usize> DiskBtreeReader<R, L>
     231              : where
     232              :     R: BlockReader,
     233              : {
     234       211811 :     pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
     235       211811 :         DiskBtreeReader {
     236       211811 :             start_blk,
     237       211811 :             root_blk,
     238       211811 :             reader,
     239       211811 :         }
     240       211811 :     }
     241              : 
     242              :     ///
     243              :     /// Read the value for given key. Returns the value, or None if it doesn't exist.
     244              :     ///
     245       410081 :     pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
     246       410081 :         let mut result: Option<u64> = None;
     247       410081 :         self.visit(
     248       410081 :             search_key,
     249       410081 :             VisitDirection::Forwards,
     250       410081 :             |key, value| {
     251       210057 :                 if key == search_key {
     252       208048 :                     result = Some(value);
     253       208048 :                 }
     254       210057 :                 false
     255       410081 :             },
     256       410081 :             ctx,
     257       410081 :         )
     258          409 :         .await?;
     259       410081 :         Ok(result)
     260       410081 :     }
     261              : 
     262            2 :     pub fn iter<'a>(
     263            2 :         &'a self,
     264            2 :         start_key: &'a [u8; L],
     265            2 :         ctx: &'a RequestContext,
     266            2 :     ) -> DiskBtreeIterator<'a> {
     267            2 :         DiskBtreeIterator {
     268            2 :             stream: Box::pin(self.get_stream_from(start_key, ctx)),
     269            2 :         }
     270            2 :     }
     271              : 
     272              :     /// Return a stream which yields all key, value pairs from the index
     273              :     /// starting from the first key greater or equal to `start_key`.
     274              :     ///
     275              :     /// Note that this is a copy of [`Self::visit`].
     276              :     /// TODO: Once the sequential read path is removed this will become
     277              :     /// the only index traversal method.
     278        64434 :     pub fn get_stream_from<'a>(
     279        64434 :         &'a self,
     280        64434 :         start_key: &'a [u8; L],
     281        64434 :         ctx: &'a RequestContext,
     282        64434 :     ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
     283              :         try_stream! {
     284              :             let mut stack = Vec::new();
     285              :             stack.push((self.root_blk, None));
     286              :             let block_cursor = self.reader.block_cursor();
     287              :             while let Some((node_blknum, opt_iter)) = stack.pop() {
     288              :                 // Locate the node.
     289              :                 let node_buf = block_cursor
     290              :                     .read_blk(self.start_blk + node_blknum, ctx)
     291              :                     .await?;
     292              : 
     293              :                 let node = OnDiskNode::deparse(node_buf.as_ref())?;
     294              :                 let prefix_len = node.prefix_len as usize;
     295              :                 let suffix_len = node.suffix_len as usize;
     296              : 
     297              :                 assert!(node.num_children > 0);
     298              : 
     299              :                 let mut keybuf = Vec::new();
     300              :                 keybuf.extend(node.prefix);
     301              :                 keybuf.resize(prefix_len + suffix_len, 0);
     302              : 
     303              :                 let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
     304              :                     iter
     305              :                 } else {
     306              :                     // Locate the first match
     307              :                     let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
     308              :                         Ok(idx) => idx,
     309              :                         Err(idx) => {
     310              :                             if node.level == 0 {
     311              :                                 // Imagine that the node contains the following keys:
     312              :                                 //
     313              :                                 // 1
     314              :                                 // 3  <-- idx
     315              :                                 // 5
     316              :                                 //
     317              :                                 // If the search key is '2' and there is exact match,
     318              :                                 // the binary search would return the index of key
     319              :                                 // '3'. That's cool, '3' is the first key to return.
     320              :                                 idx
     321              :                             } else {
     322              :                                 // This is an internal page, so each key represents a lower
     323              :                                 // bound for what's in the child page. If there is no exact
     324              :                                 // match, we have to return the *previous* entry.
     325              :                                 //
     326              :                                 // 1  <-- return this
     327              :                                 // 3  <-- idx
     328              :                                 // 5
     329              :                                 idx.saturating_sub(1)
     330              :                             }
     331              :                         }
     332              :                     };
     333              :                     Either::Left(idx..node.num_children.into())
     334              :                 };
     335              : 
     336              :                 // idx points to the first match now. Keep going from there
     337              :                 while let Some(idx) = iter.next() {
     338              :                     let key_off = idx * suffix_len;
     339              :                     let suffix = &node.keys[key_off..key_off + suffix_len];
     340              :                     keybuf[prefix_len..].copy_from_slice(suffix);
     341              :                     let value = node.value(idx);
     342              :                     #[allow(clippy::collapsible_if)]
     343              :                     if node.level == 0 {
     344              :                         // leaf
     345              :                         yield (keybuf.clone(), value.to_u64());
     346              :                     } else {
     347              :                         stack.push((node_blknum, Some(iter)));
     348              :                         stack.push((value.to_blknum(), None));
     349              :                         break;
     350              :                     }
     351              :                 }
     352              :             }
     353              :         }
     354        64434 :     }
     355              : 
     356              :     ///
     357              :     /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
     358              :     /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
     359              :     /// backwards)
     360              :     ///
     361       622454 :     pub async fn visit<V>(
     362       622454 :         &self,
     363       622454 :         search_key: &[u8; L],
     364       622454 :         dir: VisitDirection,
     365       622454 :         mut visitor: V,
     366       622454 :         ctx: &RequestContext,
     367       622454 :     ) -> Result<bool>
     368       622454 :     where
     369       622454 :         V: FnMut(&[u8], u64) -> bool,
     370       622454 :     {
     371       622454 :         let mut stack = Vec::new();
     372       622454 :         stack.push((self.root_blk, None));
     373       622454 :         let block_cursor = self.reader.block_cursor();
     374      1627132 :         while let Some((node_blknum, opt_iter)) = stack.pop() {
     375              :             // Locate the node.
     376      1426596 :             let node_buf = block_cursor
     377      1426596 :                 .read_blk(self.start_blk + node_blknum, ctx)
     378        23395 :                 .await?;
     379              : 
     380      1426596 :             let node = OnDiskNode::deparse(node_buf.as_ref())?;
     381      1426596 :             let prefix_len = node.prefix_len as usize;
     382      1426596 :             let suffix_len = node.suffix_len as usize;
     383      1426596 : 
     384      1426596 :             assert!(node.num_children > 0);
     385              : 
     386      1426596 :             let mut keybuf = Vec::new();
     387      1426596 :             keybuf.extend(node.prefix);
     388      1426596 :             keybuf.resize(prefix_len + suffix_len, 0);
     389              : 
     390      1426596 :             let mut iter = if let Some(iter) = opt_iter {
     391       203898 :                 iter
     392      1222698 :             } else if dir == VisitDirection::Forwards {
     393              :                 // Locate the first match
     394       817446 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     395       210256 :                     Ok(idx) => idx,
     396       607190 :                     Err(idx) => {
     397       607190 :                         if node.level == 0 {
     398              :                             // Imagine that the node contains the following keys:
     399              :                             //
     400              :                             // 1
     401              :                             // 3  <-- idx
     402              :                             // 5
     403              :                             //
     404              :                             // If the search key is '2' and there is exact match,
     405              :                             // the binary search would return the index of key
     406              :                             // '3'. That's cool, '3' is the first key to return.
     407       208119 :                             idx
     408              :                         } else {
     409              :                             // This is an internal page, so each key represents a lower
     410              :                             // bound for what's in the child page. If there is no exact
     411              :                             // match, we have to return the *previous* entry.
     412              :                             //
     413              :                             // 1  <-- return this
     414              :                             // 3  <-- idx
     415              :                             // 5
     416       399071 :                             idx.saturating_sub(1)
     417              :                         }
     418              :                     }
     419              :                 };
     420       817446 :                 Either::Left(idx..node.num_children.into())
     421              :             } else {
     422       405252 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     423         2224 :                     Ok(idx) => {
     424         2224 :                         // Exact match. That's the first entry to return, and walk
     425         2224 :                         // backwards from there.
     426         2224 :                         idx
     427              :                     }
     428       403028 :                     Err(idx) => {
     429              :                         // No exact match. The binary search returned the index of the
     430              :                         // first key that's > search_key. Back off by one, and walk
     431              :                         // backwards from there.
     432       403028 :                         if let Some(idx) = idx.checked_sub(1) {
     433       396940 :                             idx
     434              :                         } else {
     435         6088 :                             return Ok(false);
     436              :                         }
     437              :                     }
     438              :                 };
     439       399164 :                 Either::Right((0..=idx).rev())
     440              :             };
     441              : 
     442              :             // idx points to the first match now. Keep going from there
     443      3564470 :             while let Some(idx) = iter.next() {
     444      3160036 :                 let key_off = idx * suffix_len;
     445      3160036 :                 let suffix = &node.keys[key_off..key_off + suffix_len];
     446      3160036 :                 keybuf[prefix_len..].copy_from_slice(suffix);
     447      3160036 :                 let value = node.value(idx);
     448      3160036 :                 #[allow(clippy::collapsible_if)]
     449      3160036 :                 if node.level == 0 {
     450              :                     // leaf
     451      2559792 :                     if !visitor(&keybuf, value.to_u64()) {
     452       415830 :                         return Ok(false);
     453      2143962 :                     }
     454              :                 } else {
     455       600244 :                     stack.push((node_blknum, Some(iter)));
     456       600244 :                     stack.push((value.to_blknum(), None));
     457       600244 :                     break;
     458              :                 }
     459              :             }
     460              :         }
     461       200536 :         Ok(true)
     462       622454 :     }
     463              : 
     464              :     #[allow(dead_code)]
     465           10 :     pub async fn dump(&self) -> Result<()> {
     466           10 :         let mut stack = Vec::new();
     467           10 :         let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
     468           10 : 
     469           10 :         stack.push((self.root_blk, String::new(), 0, 0, 0));
     470           10 : 
     471           10 :         let block_cursor = self.reader.block_cursor();
     472              : 
     473         6042 :         while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
     474         6032 :             let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
     475         6032 :             let buf: &[u8] = blk.as_ref();
     476         6032 :             let node = OnDiskNode::<L>::deparse(buf)?;
     477              : 
     478         6032 :             if child_idx == 0 {
     479           18 :                 print!("{:indent$}", "", indent = depth * 2);
     480           18 :                 let path_prefix = stack
     481           18 :                     .iter()
     482           18 :                     .map(|(_blknum, path, ..)| path.as_str())
     483           18 :                     .collect::<String>();
     484           18 :                 println!(
     485           18 :                     "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
     486           18 :                     hex::encode(node.prefix),
     487           18 :                     node.suffix_len
     488           18 :                 );
     489         6014 :             }
     490              : 
     491         6032 :             if child_idx + 1 < node.num_children {
     492         6014 :                 let key_off = key_off + node.suffix_len as usize;
     493         6014 :                 stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
     494         6014 :             }
     495         6032 :             let key = &node.keys[key_off..key_off + node.suffix_len as usize];
     496         6032 :             let val = node.value(child_idx as usize);
     497         6032 : 
     498         6032 :             print!("{:indent$}", "", indent = depth * 2 + 2);
     499         6032 :             println!("{}: {}", hex::encode(key), hex::encode(val.0));
     500         6032 : 
     501         6032 :             if node.level > 0 {
     502            8 :                 stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
     503         6024 :             }
     504              :         }
     505           10 :         Ok(())
     506           10 :     }
     507              : }
     508              : 
     509              : pub struct DiskBtreeIterator<'a> {
     510              :     #[allow(clippy::type_complexity)]
     511              :     stream: std::pin::Pin<
     512              :         Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a>,
     513              :     >,
     514              : }
     515              : 
     516              : impl<'a> DiskBtreeIterator<'a> {
     517       195033 :     pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
     518       195033 :         self.stream.next().await
     519       195033 :     }
     520              : }
     521              : 
     522              : ///
     523              : /// Public builder object, for creating a new tree.
     524              : ///
     525              : /// Usage: Create a builder object by calling 'new', load all the data into the
     526              : /// tree by calling 'append' for each key-value pair, and then call 'finish'
     527              : ///
     528              : /// 'L' is the key length in bytes
     529              : pub struct DiskBtreeBuilder<W, const L: usize>
     530              : where
     531              :     W: BlockWriter,
     532              : {
     533              :     writer: W,
     534              : 
     535              :     ///
     536              :     /// `stack[0]` is the current root page, `stack.last()` is the leaf.
     537              :     ///
     538              :     /// We maintain the length of the stack to be always greater than zero.
     539              :     /// Two exceptions are:
     540              :     /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
     541              :     ///   So because other methods cannot see the intermediate state invariant still holds.
     542              :     /// 2. `Self::finish`. It consumes self and does not return it back,
     543              :     ///  which means that this is where the structure is destroyed.
     544              :     ///  Thus stack of zero length cannot be observed by other methods.
     545              :     stack: Vec<BuildNode<L>>,
     546              : 
     547              :     /// Last key that was appended to the tree. Used to sanity check that append
     548              :     /// is called in increasing key order.
     549              :     last_key: Option<[u8; L]>,
     550              : }
     551              : 
     552              : impl<W, const L: usize> DiskBtreeBuilder<W, L>
     553              : where
     554              :     W: BlockWriter,
     555              : {
     556         1563 :     pub fn new(writer: W) -> Self {
     557         1563 :         DiskBtreeBuilder {
     558         1563 :             writer,
     559         1563 :             last_key: None,
     560         1563 :             stack: vec![BuildNode::new(0)],
     561         1563 :         }
     562         1563 :     }
     563              : 
     564      7187465 :     pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
     565      7187465 :         if value > MAX_VALUE {
     566            0 :             return Err(DiskBtreeError::AppendOverflow(value));
     567      7187465 :         }
     568      7187465 :         if let Some(last_key) = &self.last_key {
     569      7185908 :             if key <= last_key {
     570            2 :                 return Err(DiskBtreeError::UnsortedInput {
     571            2 :                     key: key.as_slice().into(),
     572            2 :                     last_key: last_key.as_slice().into(),
     573            2 :                 });
     574      7185906 :             }
     575         1557 :         }
     576      7187463 :         self.last_key = Some(*key);
     577      7187463 : 
     578      7187463 :         self.append_internal(key, Value::from_u64(value))
     579      7187465 :     }
     580              : 
     581      7200322 :     fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
     582      7200322 :         // Try to append to the current leaf buffer
     583      7200322 :         let last = self
     584      7200322 :             .stack
     585      7200322 :             .last_mut()
     586      7200322 :             .expect("should always have at least one item");
     587      7200322 :         let level = last.level;
     588      7200322 :         if last.push(key, value) {
     589      7175874 :             return Ok(());
     590        24448 :         }
     591        24448 : 
     592        24448 :         // It did not fit. Try to compress, and if it succeeds to make
     593        24448 :         // some room on the node, try appending to it again.
     594        24448 :         #[allow(clippy::collapsible_if)]
     595        24448 :         if last.compress() {
     596        12386 :             if last.push(key, value) {
     597        12377 :                 return Ok(());
     598            9 :             }
     599        12062 :         }
     600              : 
     601              :         // Could not append to the current leaf. Flush it and create a new one.
     602        12071 :         self.flush_node()?;
     603              : 
     604              :         // Replace the node we flushed with an empty one and append the new
     605              :         // key to it.
     606        12071 :         let mut last = BuildNode::new(level);
     607        12071 :         if !last.push(key, value) {
     608            0 :             return Err(DiskBtreeError::FailedToPushToNewLeafNode);
     609        12071 :         }
     610        12071 : 
     611        12071 :         self.stack.push(last);
     612        12071 : 
     613        12071 :         Ok(())
     614      7200322 :     }
     615              : 
     616              :     /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
     617              :     /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
     618              :     /// creates a new root page, increasing the height of the tree.
     619        12859 :     fn flush_node(&mut self) -> Result<()> {
     620        12859 :         // Get the current bottommost node in the stack and flush it to disk.
     621        12859 :         let last = self
     622        12859 :             .stack
     623        12859 :             .pop()
     624        12859 :             .expect("should always have at least one item");
     625        12859 :         let buf = last.pack();
     626        12859 :         let downlink_key = last.first_key();
     627        12859 :         let downlink_ptr = self.writer.write_blk(buf)?;
     628              : 
     629              :         // Append the downlink to the parent. If there is no parent, ie. this was the root page,
     630              :         // create a new root page, increasing the height of the tree.
     631        12859 :         if self.stack.is_empty() {
     632          788 :             self.stack.push(BuildNode::new(last.level + 1));
     633        12071 :         }
     634        12859 :         self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
     635        12859 :     }
     636              : 
     637              :     ///
     638              :     /// Flushes everything to disk, and returns the block number of the root page.
     639              :     /// The caller must store the root block number "out-of-band", and pass it
     640              :     /// to the DiskBtreeReader::new() when you want to read the tree again.
     641              :     /// (In the image and delta layers, it is stored in the beginning of the file,
     642              :     /// in the summary header)
     643              :     ///
     644         1555 :     pub fn finish(mut self) -> Result<(u32, W)> {
     645              :         // flush all levels, except the root.
     646         2343 :         while self.stack.len() > 1 {
     647          788 :             self.flush_node()?;
     648              :         }
     649              : 
     650         1555 :         let root = self
     651         1555 :             .stack
     652         1555 :             .first()
     653         1555 :             .expect("by the check above we left one item there");
     654         1555 :         let buf = root.pack();
     655         1555 :         let root_blknum = self.writer.write_blk(buf)?;
     656              : 
     657         1555 :         Ok((root_blknum, self.writer))
     658         1555 :     }
     659              : 
     660      2023972 :     pub fn borrow_writer(&self) -> &W {
     661      2023972 :         &self.writer
     662      2023972 :     }
     663              : }
     664              : 
     665              : ///
     666              : /// BuildNode represesnts an incomplete page that we are appending to.
     667              : ///
     668              : #[derive(Clone, Debug)]
     669              : struct BuildNode<const L: usize> {
     670              :     num_children: u16,
     671              :     level: u8,
     672              :     prefix: Vec<u8>,
     673              :     suffix_len: usize,
     674              : 
     675              :     keys: Vec<u8>,
     676              :     values: Vec<u8>,
     677              : 
     678              :     size: usize, // physical size of this node, if it was written to disk like this
     679              : }
     680              : 
     681              : const NODE_SIZE: usize = PAGE_SZ;
     682              : 
     683              : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
     684              : 
     685              : impl<const L: usize> BuildNode<L> {
     686        14422 :     fn new(level: u8) -> Self {
     687        14422 :         BuildNode {
     688        14422 :             num_children: 0,
     689        14422 :             level,
     690        14422 :             prefix: Vec::new(),
     691        14422 :             suffix_len: 0,
     692        14422 :             keys: Vec::new(),
     693        14422 :             values: Vec::new(),
     694        14422 :             size: NODE_HDR_SIZE,
     695        14422 :         }
     696        14422 :     }
     697              : 
     698              :     /// Try to append a key-value pair to this node. Returns 'true' on
     699              :     /// success, 'false' if the page was full or the key was
     700              :     /// incompatible with the prefix of the existing keys.
     701      7224779 :     fn push(&mut self, key: &[u8; L], value: Value) -> bool {
     702      7224779 :         // If we have already performed prefix-compression on the page,
     703      7224779 :         // check that the incoming key has the same prefix.
     704      7224779 :         if self.num_children > 0 {
     705              :             // does the prefix allow it?
     706      7210363 :             if !key.starts_with(&self.prefix) {
     707          217 :                 return false;
     708      7210146 :             }
     709        14416 :         } else {
     710        14416 :             self.suffix_len = key.len();
     711        14416 :         }
     712              : 
     713              :         // Is the node too full?
     714      7224562 :         if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
     715        24240 :             return false;
     716      7200322 :         }
     717      7200322 : 
     718      7200322 :         // All clear
     719      7200322 :         self.num_children += 1;
     720      7200322 :         self.keys.extend(&key[self.prefix.len()..]);
     721      7200322 :         self.values.extend(value.0);
     722      7200322 : 
     723      7200322 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     724      7200322 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     725              : 
     726      7200322 :         self.size += self.suffix_len + VALUE_SZ;
     727      7200322 : 
     728      7200322 :         true
     729      7224779 :     }
     730              : 
     731              :     ///
     732              :     /// Perform prefix-compression.
     733              :     ///
     734              :     /// Returns 'true' on success, 'false' if no compression was possible.
     735              :     ///
     736        24448 :     fn compress(&mut self) -> bool {
     737        24448 :         let first_suffix = self.first_suffix();
     738        24448 :         let last_suffix = self.last_suffix();
     739        24448 : 
     740        24448 :         // Find the common prefix among all keys
     741        24448 :         let mut prefix_len = 0;
     742       221948 :         while prefix_len < self.suffix_len {
     743       221948 :             if first_suffix[prefix_len] != last_suffix[prefix_len] {
     744        24448 :                 break;
     745       197500 :             }
     746       197500 :             prefix_len += 1;
     747              :         }
     748        24448 :         if prefix_len == 0 {
     749        12062 :             return false;
     750        12386 :         }
     751        12386 : 
     752        12386 :         // Can compress. Rewrite the keys without the common prefix.
     753        12386 :         self.prefix.extend(&self.keys[..prefix_len]);
     754        12386 : 
     755        12386 :         let mut new_keys = Vec::new();
     756        12386 :         let mut key_off = 0;
     757      3349505 :         while key_off < self.keys.len() {
     758      3337119 :             let next_key_off = key_off + self.suffix_len;
     759      3337119 :             new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
     760      3337119 :             key_off = next_key_off;
     761      3337119 :         }
     762        12386 :         self.keys = new_keys;
     763        12386 :         self.suffix_len -= prefix_len;
     764        12386 : 
     765        12386 :         self.size -= prefix_len * self.num_children as usize;
     766        12386 :         self.size += prefix_len;
     767        12386 : 
     768        12386 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     769        12386 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     770              : 
     771        12386 :         true
     772        24448 :     }
     773              : 
     774              :     ///
     775              :     /// Serialize the node to on-disk format.
     776              :     ///
     777        14414 :     fn pack(&self) -> Bytes {
     778        14414 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     779        14414 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     780        14414 :         assert!(self.num_children > 0);
     781              : 
     782        14414 :         let mut buf = BytesMut::new();
     783        14414 : 
     784        14414 :         buf.put_u16(self.num_children);
     785        14414 :         buf.put_u8(self.level);
     786        14414 :         buf.put_u8(self.prefix.len() as u8);
     787        14414 :         buf.put_u8(self.suffix_len as u8);
     788        14414 :         buf.put(&self.prefix[..]);
     789        14414 :         buf.put(&self.keys[..]);
     790        14414 :         buf.put(&self.values[..]);
     791        14414 : 
     792        14414 :         assert!(buf.len() == self.size);
     793              : 
     794        14414 :         assert!(buf.len() <= PAGE_SZ);
     795        14414 :         buf.resize(PAGE_SZ, 0);
     796        14414 :         buf.freeze()
     797        14414 :     }
     798              : 
     799        37307 :     fn first_suffix(&self) -> &[u8] {
     800        37307 :         &self.keys[..self.suffix_len]
     801        37307 :     }
     802        24448 :     fn last_suffix(&self) -> &[u8] {
     803        24448 :         &self.keys[self.keys.len() - self.suffix_len..]
     804        24448 :     }
     805              : 
     806              :     /// Return the full first key of the page, including the prefix
     807        12859 :     fn first_key(&self) -> [u8; L] {
     808        12859 :         let mut key = [0u8; L];
     809        12859 :         key[..self.prefix.len()].copy_from_slice(&self.prefix);
     810        12859 :         key[self.prefix.len()..].copy_from_slice(self.first_suffix());
     811        12859 :         key
     812        12859 :     }
     813              : }
     814              : 
     815              : #[cfg(test)]
     816              : pub(crate) mod tests {
     817              :     use super::*;
     818              :     use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
     819              :     use rand::Rng;
     820              :     use std::collections::BTreeMap;
     821              :     use std::sync::atomic::{AtomicUsize, Ordering};
     822              : 
     823              :     #[derive(Clone, Default)]
     824              :     pub(crate) struct TestDisk {
     825              :         blocks: Vec<Bytes>,
     826              :     }
     827              :     impl TestDisk {
     828           10 :         fn new() -> Self {
     829           10 :             Self::default()
     830           10 :         }
     831      1016608 :         pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
     832      1016608 :             let mut buf = [0u8; PAGE_SZ];
     833      1016608 :             buf.copy_from_slice(&self.blocks[blknum as usize]);
     834      1016608 :             Ok(std::sync::Arc::new(buf).into())
     835      1016608 :         }
     836              :     }
     837              :     impl BlockReader for TestDisk {
     838       411143 :         fn block_cursor(&self) -> BlockCursor<'_> {
     839       411143 :             BlockCursor::new(BlockReaderRef::TestDisk(self))
     840       411143 :         }
     841              :     }
     842              :     impl BlockWriter for &mut TestDisk {
     843          215 :         fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
     844          215 :             let blknum = self.blocks.len();
     845          215 :             self.blocks.push(buf);
     846          215 :             Ok(blknum as u32)
     847          215 :         }
     848              :     }
     849              : 
     850              :     #[tokio::test]
     851            2 :     async fn basic() -> Result<()> {
     852            2 :         let mut disk = TestDisk::new();
     853            2 :         let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
     854            2 : 
     855            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     856            2 : 
     857            2 :         let all_keys: Vec<&[u8; 6]> = vec![
     858            2 :             b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
     859            2 :         ];
     860            2 :         let all_data: Vec<(&[u8; 6], u64)> = all_keys
     861            2 :             .iter()
     862            2 :             .enumerate()
     863           16 :             .map(|(idx, key)| (*key, idx as u64))
     864            2 :             .collect();
     865           16 :         for (key, val) in all_data.iter() {
     866           16 :             writer.append(key, *val)?;
     867            2 :         }
     868            2 : 
     869            2 :         let (root_offset, _writer) = writer.finish()?;
     870            2 : 
     871            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     872            2 : 
     873            2 :         reader.dump().await?;
     874            2 : 
     875            2 :         // Test the `get` function on all the keys.
     876           16 :         for (key, val) in all_data.iter() {
     877           16 :             assert_eq!(reader.get(key, &ctx).await?, Some(*val));
     878            2 :         }
     879            2 :         // And on some keys that don't exist
     880            2 :         assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
     881            2 :         assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
     882            2 :         assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
     883            2 : 
     884            2 :         // Test search with `visit` function
     885            2 :         let search_key = b"xabaaa";
     886            2 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     887            2 :             .iter()
     888           16 :             .filter(|(key, _value)| key[..] >= search_key[..])
     889           10 :             .map(|(key, value)| (key.to_vec(), *value))
     890            2 :             .collect();
     891            2 : 
     892            2 :         let mut data = Vec::new();
     893            2 :         reader
     894            2 :             .visit(
     895            2 :                 search_key,
     896            2 :                 VisitDirection::Forwards,
     897           10 :                 |key, value| {
     898           10 :                     data.push((key.to_vec(), value));
     899           10 :                     true
     900           10 :                 },
     901            2 :                 &ctx,
     902            2 :             )
     903            2 :             .await?;
     904            2 :         assert_eq!(data, expected);
     905            2 : 
     906            2 :         // Test a backwards scan
     907            2 :         let mut expected: Vec<(Vec<u8>, u64)> = all_data
     908            2 :             .iter()
     909           16 :             .filter(|(key, _value)| key[..] <= search_key[..])
     910            8 :             .map(|(key, value)| (key.to_vec(), *value))
     911            2 :             .collect();
     912            2 :         expected.reverse();
     913            2 :         let mut data = Vec::new();
     914            2 :         reader
     915            2 :             .visit(
     916            2 :                 search_key,
     917            2 :                 VisitDirection::Backwards,
     918            8 :                 |key, value| {
     919            8 :                     data.push((key.to_vec(), value));
     920            8 :                     true
     921            8 :                 },
     922            2 :                 &ctx,
     923            2 :             )
     924            2 :             .await?;
     925            2 :         assert_eq!(data, expected);
     926            2 : 
     927            2 :         // Backward scan where nothing matches
     928            2 :         reader
     929            2 :             .visit(
     930            2 :                 b"aaaaaa",
     931            2 :                 VisitDirection::Backwards,
     932            2 :                 |key, value| {
     933            0 :                     panic!("found unexpected key {}: {}", hex::encode(key), value);
     934            2 :                 },
     935            2 :                 &ctx,
     936            2 :             )
     937            2 :             .await?;
     938            2 : 
     939            2 :         // Full scan
     940            2 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     941            2 :             .iter()
     942           16 :             .map(|(key, value)| (key.to_vec(), *value))
     943            2 :             .collect();
     944            2 :         let mut data = Vec::new();
     945            2 :         reader
     946            2 :             .visit(
     947            2 :                 &[0u8; 6],
     948            2 :                 VisitDirection::Forwards,
     949           16 :                 |key, value| {
     950           16 :                     data.push((key.to_vec(), value));
     951           16 :                     true
     952           16 :                 },
     953            2 :                 &ctx,
     954            2 :             )
     955            2 :             .await?;
     956            2 :         assert_eq!(data, expected);
     957            2 : 
     958            2 :         Ok(())
     959            2 :     }
     960              : 
     961              :     #[tokio::test]
     962            2 :     async fn lots_of_keys() -> Result<()> {
     963            2 :         let mut disk = TestDisk::new();
     964            2 :         let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
     965            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     966            2 : 
     967            2 :         const NUM_KEYS: u64 = 1000;
     968            2 : 
     969            2 :         let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
     970            2 : 
     971         2002 :         for idx in 0..NUM_KEYS {
     972         2000 :             let key_int: u64 = 1 + idx * 2;
     973         2000 :             let key = u64::to_be_bytes(key_int);
     974         2000 :             writer.append(&key, idx)?;
     975            2 : 
     976         2000 :             all_data.insert(key_int, idx);
     977            2 :         }
     978            2 : 
     979            2 :         let (root_offset, _writer) = writer.finish()?;
     980            2 : 
     981            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     982            2 : 
     983            2 :         reader.dump().await?;
     984            2 : 
     985            2 :         use std::sync::Mutex;
     986            2 : 
     987            2 :         let result = Mutex::new(Vec::new());
     988            2 :         let limit: AtomicUsize = AtomicUsize::new(10);
     989        83820 :         let take_ten = |key: &[u8], value: u64| {
     990        83820 :             let mut keybuf = [0u8; 8];
     991        83820 :             keybuf.copy_from_slice(key);
     992        83820 :             let key_int = u64::from_be_bytes(keybuf);
     993        83820 : 
     994        83820 :             let mut result = result.lock().unwrap();
     995        83820 :             result.push((key_int, value));
     996        83820 : 
     997        83820 :             // keep going until we have 10 matches
     998        83820 :             result.len() < limit.load(Ordering::Relaxed)
     999        83820 :         };
    1000            2 : 
    1001         4020 :         for search_key_int in 0..(NUM_KEYS * 2 + 10) {
    1002         4020 :             let search_key = u64::to_be_bytes(search_key_int);
    1003         4020 :             assert_eq!(
    1004         4020 :                 reader.get(&search_key, &ctx).await?,
    1005         4020 :                 all_data.get(&search_key_int).cloned()
    1006            2 :             );
    1007            2 : 
    1008            2 :             // Test a forward scan starting with this key
    1009         4020 :             result.lock().unwrap().clear();
    1010         4020 :             reader
    1011         4020 :                 .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1012            2 :                 .await?;
    1013         4020 :             let expected = all_data
    1014         4020 :                 .range(search_key_int..)
    1015         4020 :                 .take(10)
    1016        39820 :                 .map(|(&key, &val)| (key, val))
    1017         4020 :                 .collect::<Vec<(u64, u64)>>();
    1018         4020 :             assert_eq!(*result.lock().unwrap(), expected);
    1019            2 : 
    1020            2 :             // And a backwards scan
    1021         4020 :             result.lock().unwrap().clear();
    1022         4020 :             reader
    1023         4020 :                 .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1024            2 :                 .await?;
    1025         4020 :             let expected = all_data
    1026         4020 :                 .range(..=search_key_int)
    1027         4020 :                 .rev()
    1028         4020 :                 .take(10)
    1029        40000 :                 .map(|(&key, &val)| (key, val))
    1030         4020 :                 .collect::<Vec<(u64, u64)>>();
    1031         4020 :             assert_eq!(*result.lock().unwrap(), expected);
    1032            2 :         }
    1033            2 : 
    1034            2 :         // full scan
    1035            2 :         let search_key = u64::to_be_bytes(0);
    1036            2 :         limit.store(usize::MAX, Ordering::Relaxed);
    1037            2 :         result.lock().unwrap().clear();
    1038            2 :         reader
    1039            2 :             .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1040            2 :             .await?;
    1041            2 :         let expected = all_data
    1042            2 :             .iter()
    1043         2000 :             .map(|(&key, &val)| (key, val))
    1044            2 :             .collect::<Vec<(u64, u64)>>();
    1045            2 :         assert_eq!(*result.lock().unwrap(), expected);
    1046            2 : 
    1047            2 :         // full scan
    1048            2 :         let search_key = u64::to_be_bytes(u64::MAX);
    1049            2 :         limit.store(usize::MAX, Ordering::Relaxed);
    1050            2 :         result.lock().unwrap().clear();
    1051            2 :         reader
    1052            2 :             .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1053            2 :             .await?;
    1054            2 :         let expected = all_data
    1055            2 :             .iter()
    1056            2 :             .rev()
    1057         2000 :             .map(|(&key, &val)| (key, val))
    1058            2 :             .collect::<Vec<(u64, u64)>>();
    1059            2 :         assert_eq!(*result.lock().unwrap(), expected);
    1060            2 : 
    1061            2 :         Ok(())
    1062            2 :     }
    1063              : 
    1064              :     #[tokio::test]
    1065            2 :     async fn random_data() -> Result<()> {
    1066            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1067            2 : 
    1068            2 :         // Generate random keys with exponential distribution, to
    1069            2 :         // exercise the prefix compression
    1070            2 :         const NUM_KEYS: usize = 100000;
    1071            2 :         let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
    1072       200002 :         for idx in 0..NUM_KEYS {
    1073       200000 :             let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
    1074       200000 :             let t = -(f64::ln(u));
    1075       200000 :             let key_int = (t * 1000000.0) as u128;
    1076       200000 : 
    1077       200000 :             all_data.insert(key_int, idx as u64);
    1078       200000 :         }
    1079            2 : 
    1080            2 :         // Build a tree from it
    1081            2 :         let mut disk = TestDisk::new();
    1082            2 :         let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
    1083            2 : 
    1084       195031 :         for (&key, &val) in all_data.iter() {
    1085       195031 :             writer.append(&u128::to_be_bytes(key), val)?;
    1086            2 :         }
    1087            2 :         let (root_offset, _writer) = writer.finish()?;
    1088            2 : 
    1089            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1090            2 : 
    1091            2 :         // Test get() operation on all the keys
    1092       195031 :         for (&key, &val) in all_data.iter() {
    1093       195031 :             let search_key = u128::to_be_bytes(key);
    1094       195031 :             assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
    1095            2 :         }
    1096            2 : 
    1097            2 :         // Test get() operations on random keys, most of which will not exist
    1098       200002 :         for _ in 0..100000 {
    1099       200000 :             let key_int = rand::thread_rng().gen::<u128>();
    1100       200000 :             let search_key = u128::to_be_bytes(key_int);
    1101       200000 :             assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
    1102            2 :         }
    1103            2 : 
    1104            2 :         // Test boundary cases
    1105            2 :         assert!(
    1106            2 :             reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
    1107            2 :                 == all_data.get(&u128::MIN).cloned()
    1108            2 :         );
    1109            2 :         assert!(
    1110            2 :             reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
    1111            2 :                 == all_data.get(&u128::MAX).cloned()
    1112            2 :         );
    1113            2 : 
    1114            2 :         // Test iterator and get_stream API
    1115            2 :         let mut iter = reader.iter(&[0; 16], &ctx);
    1116            2 :         let mut cnt = 0;
    1117       195033 :         while let Some(res) = iter.next().await {
    1118       195031 :             let (key, val) = res?;
    1119       195031 :             let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
    1120       195031 :             assert_eq!(val, *all_data.get(&key).unwrap());
    1121       195031 :             cnt += 1;
    1122            2 :         }
    1123            2 :         assert_eq!(cnt, all_data.len());
    1124            2 : 
    1125            2 :         Ok(())
    1126            2 :     }
    1127              : 
    1128              :     #[test]
    1129            2 :     fn unsorted_input() {
    1130            2 :         let mut disk = TestDisk::new();
    1131            2 :         let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
    1132            2 : 
    1133            2 :         let _ = writer.append(b"ba", 1);
    1134            2 :         let _ = writer.append(b"bb", 2);
    1135            2 :         let err = writer.append(b"aa", 3).expect_err("should've failed");
    1136            2 :         match err {
    1137            2 :             DiskBtreeError::UnsortedInput { key, last_key } => {
    1138            2 :                 assert_eq!(key.as_ref(), b"aa".as_slice());
    1139            2 :                 assert_eq!(last_key.as_ref(), b"bb".as_slice());
    1140              :             }
    1141            0 :             _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
    1142              :         }
    1143            2 :     }
    1144              : 
    1145              :     ///
    1146              :     /// This test contains a particular data set, see disk_btree_test_data.rs
    1147              :     ///
    1148              :     #[tokio::test]
    1149            2 :     async fn particular_data() -> Result<()> {
    1150            2 :         // Build a tree from it
    1151            2 :         let mut disk = TestDisk::new();
    1152            2 :         let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
    1153            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1154            2 : 
    1155         4002 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1156         4000 :             writer.append(&key, val)?;
    1157            2 :         }
    1158            2 :         let (root_offset, writer) = writer.finish()?;
    1159            2 : 
    1160            2 :         println!("SIZE: {} blocks", writer.blocks.len());
    1161            2 : 
    1162            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1163            2 : 
    1164            2 :         // Test get() operation on all the keys
    1165         4002 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1166         4000 :             assert_eq!(reader.get(&key, &ctx).await?, Some(val));
    1167            2 :         }
    1168            2 : 
    1169            2 :         // Test full scan
    1170            2 :         let mut count = 0;
    1171            2 :         reader
    1172            2 :             .visit(
    1173            2 :                 &[0u8; 26],
    1174            2 :                 VisitDirection::Forwards,
    1175         4000 :                 |_key, _value| {
    1176         4000 :                     count += 1;
    1177         4000 :                     true
    1178         4000 :                 },
    1179            2 :                 &ctx,
    1180            2 :             )
    1181            2 :             .await?;
    1182            2 :         assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
    1183            2 : 
    1184            2 :         reader.dump().await?;
    1185            2 : 
    1186            2 :         Ok(())
    1187            2 :     }
    1188              : }
    1189              : 
    1190              : #[cfg(test)]
    1191              : #[path = "disk_btree_test_data.rs"]
    1192              : mod disk_btree_test_data;
         |