LCOV - code coverage report
Current view: top level - pageserver/src/tenant - disk_btree.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 98.9 % 846 837
Test Date: 2024-09-25 14:04:07 Functions: 82.8 % 209 173

            Line data    Source code
       1              : //!
       2              : //! Simple on-disk B-tree implementation
       3              : //!
       4              : //! This is used as the index structure within image and delta layers
       5              : //!
       6              : //! Features:
       7              : //! - Fixed-width keys
       8              : //! - Fixed-width values (VALUE_SZ)
       9              : //! - The tree is created in a bulk operation. Insert/deletion after creation
      10              : //!   is not supported
      11              : //! - page-oriented
      12              : //!
      13              : //! TODO:
      14              : //! - maybe something like an Adaptive Radix Tree would be more efficient?
      15              : //! - the values stored by image and delta layers are offsets into the file,
      16              : //!   and they are in monotonically increasing order. Prefix compression would
      17              : //!   be very useful for them, too.
      18              : //! - An Iterator interface would be more convenient for the callers than the
      19              : //!   'visit' function
      20              : //!
      21              : use async_stream::try_stream;
      22              : use byteorder::{ReadBytesExt, BE};
      23              : use bytes::{BufMut, Bytes, BytesMut};
      24              : use either::Either;
      25              : use futures::{Stream, StreamExt};
      26              : use hex;
      27              : use std::{
      28              :     cmp::Ordering,
      29              :     io,
      30              :     iter::Rev,
      31              :     ops::{Range, RangeInclusive},
      32              :     result,
      33              : };
      34              : use thiserror::Error;
      35              : use tracing::error;
      36              : 
      37              : use crate::{
      38              :     context::{DownloadBehavior, RequestContext},
      39              :     task_mgr::TaskKind,
      40              :     tenant::block_io::{BlockReader, BlockWriter},
      41              : };
      42              : 
      43              : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
      44              : pub const VALUE_SZ: usize = 5;
      45              : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
      46              : 
      47              : pub const PAGE_SZ: usize = 8192;
      48              : 
      49              : #[derive(Clone, Copy, Debug)]
      50              : struct Value([u8; VALUE_SZ]);
      51              : 
      52              : impl Value {
      53     21054804 :     fn from_slice(slice: &[u8]) -> Value {
      54     21054804 :         let mut b = [0u8; VALUE_SZ];
      55     21054804 :         b.copy_from_slice(slice);
      56     21054804 :         Value(b)
      57     21054804 :     }
      58              : 
      59     21655084 :     fn from_u64(x: u64) -> Value {
      60     21655084 :         assert!(x <= 0x007f_ffff_ffff);
      61     21655084 :         Value([
      62     21655084 :             (x >> 32) as u8,
      63     21655084 :             (x >> 24) as u8,
      64     21655084 :             (x >> 16) as u8,
      65     21655084 :             (x >> 8) as u8,
      66     21655084 :             x as u8,
      67     21655084 :         ])
      68     21655084 :     }
      69              : 
      70        38658 :     fn from_blknum(x: u32) -> Value {
      71        38658 :         Value([
      72        38658 :             0x80,
      73        38658 :             (x >> 24) as u8,
      74        38658 :             (x >> 16) as u8,
      75        38658 :             (x >> 8) as u8,
      76        38658 :             x as u8,
      77        38658 :         ])
      78        38658 :     }
      79              : 
      80              :     #[allow(dead_code)]
      81            0 :     fn is_offset(self) -> bool {
      82            0 :         self.0[0] & 0x80 != 0
      83            0 :     }
      84              : 
      85     19074088 :     fn to_u64(self) -> u64 {
      86     19074088 :         let b = &self.0;
      87     19074088 :         (b[0] as u64) << 32
      88     19074088 :             | (b[1] as u64) << 24
      89     19074088 :             | (b[2] as u64) << 16
      90     19074088 :             | (b[3] as u64) << 8
      91     19074088 :             | b[4] as u64
      92     19074088 :     }
      93              : 
      94      1962644 :     fn to_blknum(self) -> u32 {
      95      1962644 :         let b = &self.0;
      96      1962644 :         assert!(b[0] == 0x80);
      97      1962644 :         (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
      98      1962644 :     }
      99              : }
     100              : 
     101            0 : #[derive(Error, Debug)]
     102              : pub enum DiskBtreeError {
     103              :     #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
     104              :     AppendOverflow(u64),
     105              : 
     106              :     #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
     107              :     UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
     108              : 
     109              :     #[error("Could not push to new leaf node")]
     110              :     FailedToPushToNewLeafNode,
     111              : 
     112              :     #[error("IoError: {0}")]
     113              :     Io(#[from] io::Error),
     114              : }
     115              : 
     116              : pub type Result<T> = result::Result<T, DiskBtreeError>;
     117              : 
     118              : /// This is the on-disk representation.
     119              : struct OnDiskNode<'a, const L: usize> {
     120              :     // Fixed-width fields
     121              :     num_children: u16,
     122              :     level: u8,
     123              :     prefix_len: u8,
     124              :     suffix_len: u8,
     125              : 
     126              :     // Variable-length fields. These are stored on-disk after the fixed-width
     127              :     // fields, in this order. In the in-memory representation, these point to
     128              :     // the right parts in the page buffer.
     129              :     prefix: &'a [u8],
     130              :     keys: &'a [u8],
     131              :     values: &'a [u8],
     132              : }
     133              : 
     134              : impl<'a, const L: usize> OnDiskNode<'a, L> {
     135              :     ///
     136              :     /// Interpret a PAGE_SZ page as a node.
     137              :     ///
     138      4701337 :     fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
     139      4701337 :         let mut cursor = std::io::Cursor::new(buf);
     140      4701337 :         let num_children = cursor.read_u16::<BE>()?;
     141      4701337 :         let level = cursor.read_u8()?;
     142      4701337 :         let prefix_len = cursor.read_u8()?;
     143      4701337 :         let suffix_len = cursor.read_u8()?;
     144              : 
     145      4701337 :         let mut off = cursor.position();
     146      4701337 :         let prefix_off = off as usize;
     147      4701337 :         off += prefix_len as u64;
     148      4701337 : 
     149      4701337 :         let keys_off = off as usize;
     150      4701337 :         let keys_len = num_children as usize * suffix_len as usize;
     151      4701337 :         off += keys_len as u64;
     152      4701337 : 
     153      4701337 :         let values_off = off as usize;
     154      4701337 :         let values_len = num_children as usize * VALUE_SZ;
     155      4701337 :         //off += values_len as u64;
     156      4701337 : 
     157      4701337 :         let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
     158      4701337 :         let keys = &buf[keys_off..keys_off + keys_len];
     159      4701337 :         let values = &buf[values_off..values_off + values_len];
     160      4701337 : 
     161      4701337 :         Ok(OnDiskNode {
     162      4701337 :             num_children,
     163      4701337 :             level,
     164      4701337 :             prefix_len,
     165      4701337 :             suffix_len,
     166      4701337 :             prefix,
     167      4701337 :             keys,
     168      4701337 :             values,
     169      4701337 :         })
     170      4701337 :     }
     171              : 
     172              :     ///
     173              :     /// Read a value at 'idx'
     174              :     ///
     175     21054804 :     fn value(&self, idx: usize) -> Value {
     176     21054804 :         let value_off = idx * VALUE_SZ;
     177     21054804 :         let value_slice = &self.values[value_off..value_off + VALUE_SZ];
     178     21054804 :         Value::from_slice(value_slice)
     179     21054804 :     }
     180              : 
     181      4029692 :     fn binary_search(
     182      4029692 :         &self,
     183      4029692 :         search_key: &[u8; L],
     184      4029692 :         keybuf: &mut [u8],
     185      4029692 :     ) -> result::Result<usize, usize> {
     186      4029692 :         let mut size = self.num_children as usize;
     187      4029692 :         let mut low = 0;
     188      4029692 :         let mut high = size;
     189     30451022 :         while low < high {
     190     27062394 :             let mid = low + size / 2;
     191     27062394 : 
     192     27062394 :             let key_off = mid * self.suffix_len as usize;
     193     27062394 :             let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
     194     27062394 :             // Does this match?
     195     27062394 :             keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
     196     27062394 : 
     197     27062394 :             let cmp = keybuf[..].cmp(search_key);
     198     27062394 : 
     199     27062394 :             if cmp == Ordering::Less {
     200     17077695 :                 low = mid + 1;
     201     17077695 :             } else if cmp == Ordering::Greater {
     202      9343635 :                 high = mid;
     203      9343635 :             } else {
     204       641064 :                 return Ok(mid);
     205              :             }
     206     26421330 :             size = high - low;
     207              :         }
     208      3388628 :         Err(low)
     209      4029692 :     }
     210              : }
     211              : 
     212              : ///
     213              : /// Public reader object, to search the tree.
     214              : ///
     215              : #[derive(Clone)]
     216              : pub struct DiskBtreeReader<R, const L: usize>
     217              : where
     218              :     R: BlockReader,
     219              : {
     220              :     start_blk: u32,
     221              :     root_blk: u32,
     222              :     reader: R,
     223              : }
     224              : 
     225              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     226              : pub enum VisitDirection {
     227              :     Forwards,
     228              :     Backwards,
     229              : }
     230              : 
     231              : impl<R, const L: usize> DiskBtreeReader<R, L>
     232              : where
     233              :     R: BlockReader,
     234              : {
     235       642007 :     pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
     236       642007 :         DiskBtreeReader {
     237       642007 :             start_blk,
     238       642007 :             root_blk,
     239       642007 :             reader,
     240       642007 :         }
     241       642007 :     }
     242              : 
     243              :     ///
     244              :     /// Read the value for given key. Returns the value, or None if it doesn't exist.
     245              :     ///
     246      1209454 :     pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
     247      1209454 :         let mut result: Option<u64> = None;
     248      1209454 :         self.visit(
     249      1209454 :             search_key,
     250      1209454 :             VisitDirection::Forwards,
     251      1209454 :             |key, value| {
     252       609382 :                 if key == search_key {
     253       603364 :                     result = Some(value);
     254       603364 :                 }
     255       609382 :                 false
     256      1209454 :             },
     257      1209454 :             ctx,
     258      1209454 :         )
     259            0 :         .await?;
     260      1209454 :         Ok(result)
     261      1209454 :     }
     262              : 
     263         1872 :     pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
     264         1872 :     where
     265         1872 :         R: 'a + Send,
     266         1872 :     {
     267         1872 :         DiskBtreeIterator {
     268         1872 :             stream: Box::pin(self.into_stream(start_key, ctx)),
     269         1872 :         }
     270         1872 :     }
     271              : 
     272              :     /// Return a stream which yields all key, value pairs from the index
     273              :     /// starting from the first key greater or equal to `start_key`.
     274              :     ///
     275              :     /// Note 1: that this is a copy of [`Self::visit`].
     276              :     /// TODO: Once the sequential read path is removed this will become
     277              :     /// the only index traversal method.
     278              :     ///
     279              :     /// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
     280              :     /// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
     281              :     /// requires the reader to be present when the stream is used, and this creates a lifetime
     282              :     /// dependency between the reader and the stream. Now if we want to create an iterator that
     283              :     /// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
     284              :     /// to use from the image/delta layer APIs.
     285              :     ///
     286              :     /// Feel free to add the `&self` variant back if it's necessary.
     287       832238 :     pub fn into_stream<'a>(
     288       832238 :         self,
     289       832238 :         start_key: &'a [u8; L],
     290       832238 :         ctx: &'a RequestContext,
     291       832238 :     ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
     292       832238 :     where
     293       832238 :         R: 'a,
     294       832238 :     {
     295       832238 :         try_stream! {
     296       832238 :             let mut stack = Vec::new();
     297       832238 :             stack.push((self.root_blk, None));
     298       832238 :             let block_cursor = self.reader.block_cursor();
     299       832238 :             let mut node_buf = [0_u8; PAGE_SZ];
     300       832238 :             while let Some((node_blknum, opt_iter)) = stack.pop() {
     301       832238 :                 // Read the node, through the PS PageCache, into local variable `node_buf`.
     302       832238 :                 // We could keep the page cache read guard alive, but, at the time of writing,
     303       832238 :                 // we run quite small PS PageCache s => can't risk running out of
     304       832238 :                 // PageCache space because this stream isn't consumed fast enough.
     305       832238 :                 let page_read_guard = block_cursor
     306       832238 :                     .read_blk(self.start_blk + node_blknum, ctx)
     307       832238 :                     .await?;
     308       832238 :                 node_buf.copy_from_slice(page_read_guard.as_ref());
     309       832238 :                 drop(page_read_guard); // drop page cache read guard early
     310       832238 : 
     311       832238 :                 let node = OnDiskNode::deparse(&node_buf)?;
     312       832238 :                 let prefix_len = node.prefix_len as usize;
     313       832238 :                 let suffix_len = node.suffix_len as usize;
     314       832238 : 
     315       832238 :                 assert!(node.num_children > 0);
     316       832238 : 
     317       832238 :                 let mut keybuf = Vec::new();
     318       832238 :                 keybuf.extend(node.prefix);
     319       832238 :                 keybuf.resize(prefix_len + suffix_len, 0);
     320       832238 : 
     321       832238 :                 let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
     322       832238 :                     iter
     323       832238 :                 } else {
     324       832238 :                     // Locate the first match
     325       832238 :                     let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
     326       832238 :                         Ok(idx) => idx,
     327       832238 :                         Err(idx) => {
     328       832238 :                             if node.level == 0 {
     329       832238 :                                 // Imagine that the node contains the following keys:
     330       832238 :                                 //
     331       832238 :                                 // 1
     332       832238 :                                 // 3  <-- idx
     333       832238 :                                 // 5
     334       832238 :                                 //
     335       832238 :                                 // If the search key is '2' and there is exact match,
     336       832238 :                                 // the binary search would return the index of key
     337       832238 :                                 // '3'. That's cool, '3' is the first key to return.
     338       832238 :                                 idx
     339       832238 :                             } else {
     340       832238 :                                 // This is an internal page, so each key represents a lower
     341       832238 :                                 // bound for what's in the child page. If there is no exact
     342       832238 :                                 // match, we have to return the *previous* entry.
     343       832238 :                                 //
     344       832238 :                                 // 1  <-- return this
     345       832238 :                                 // 3  <-- idx
     346       832238 :                                 // 5
     347       832238 :                                 idx.saturating_sub(1)
     348       832238 :                             }
     349       832238 :                         }
     350       832238 :                     };
     351       832238 :                     Either::Left(idx..node.num_children.into())
     352       832238 :                 };
     353       832238 : 
     354       832238 : 
     355       832238 :                 // idx points to the first match now. Keep going from there
     356       832238 :                 while let Some(idx) = iter.next() {
     357       832238 :                     let key_off = idx * suffix_len;
     358       832238 :                     let suffix = &node.keys[key_off..key_off + suffix_len];
     359       832238 :                     keybuf[prefix_len..].copy_from_slice(suffix);
     360       832238 :                     let value = node.value(idx);
     361       832238 :                     #[allow(clippy::collapsible_if)]
     362       832238 :                     if node.level == 0 {
     363       832238 :                         // leaf
     364       832238 :                         yield (keybuf.clone(), value.to_u64());
     365       832238 :                     } else {
     366       832238 :                         stack.push((node_blknum, Some(iter)));
     367       832238 :                         stack.push((value.to_blknum(), None));
     368       832238 :                         break;
     369       832238 :                     }
     370       832238 :                 }
     371       832238 :             }
     372       832238 :         }
     373       832238 :     }
     374              : 
     375              :     ///
     376              :     /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
     377              :     /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
     378              :     /// backwards)
     379              :     ///
     380      1234834 :     pub async fn visit<V>(
     381      1234834 :         &self,
     382      1234834 :         search_key: &[u8; L],
     383      1234834 :         dir: VisitDirection,
     384      1234834 :         mut visitor: V,
     385      1234834 :         ctx: &RequestContext,
     386      1234834 :     ) -> Result<bool>
     387      1234834 :     where
     388      1234834 :         V: FnMut(&[u8], u64) -> bool,
     389      1234834 :     {
     390      1234834 :         let mut stack = Vec::new();
     391      1234834 :         stack.push((self.root_blk, None));
     392      1234834 :         let block_cursor = self.reader.block_cursor();
     393      3657146 :         while let Some((node_blknum, opt_iter)) = stack.pop() {
     394              :             // Locate the node.
     395      3055544 :             let node_buf = block_cursor
     396      3055544 :                 .read_blk(self.start_blk + node_blknum, ctx)
     397         6357 :                 .await?;
     398              : 
     399      3055544 :             let node = OnDiskNode::deparse(node_buf.as_ref())?;
     400      3055544 :             let prefix_len = node.prefix_len as usize;
     401      3055544 :             let suffix_len = node.suffix_len as usize;
     402      3055544 : 
     403      3055544 :             assert!(node.num_children > 0);
     404              : 
     405      3055544 :             let mut keybuf = Vec::new();
     406      3055544 :             keybuf.extend(node.prefix);
     407      3055544 :             keybuf.resize(prefix_len + suffix_len, 0);
     408              : 
     409      3055544 :             let mut iter = if let Some(iter) = opt_iter {
     410       611694 :                 iter
     411      2443850 :             } else if dir == VisitDirection::Forwards {
     412              :                 // Locate the first match
     413      2431772 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     414       609988 :                     Ok(idx) => idx,
     415      1821784 :                     Err(idx) => {
     416      1821784 :                         if node.level == 0 {
     417              :                             // Imagine that the node contains the following keys:
     418              :                             //
     419              :                             // 1
     420              :                             // 3  <-- idx
     421              :                             // 5
     422              :                             //
     423              :                             // If the search key is '2' and there is exact match,
     424              :                             // the binary search would return the index of key
     425              :                             // '3'. That's cool, '3' is the first key to return.
     426       624348 :                             idx
     427              :                         } else {
     428              :                             // This is an internal page, so each key represents a lower
     429              :                             // bound for what's in the child page. If there is no exact
     430              :                             // match, we have to return the *previous* entry.
     431              :                             //
     432              :                             // 1  <-- return this
     433              :                             // 3  <-- idx
     434              :                             // 5
     435      1197436 :                             idx.saturating_sub(1)
     436              :                         }
     437              :                     }
     438              :                 };
     439      2431772 :                 Either::Left(idx..node.num_children.into())
     440              :             } else {
     441        12078 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     442         6006 :                     Ok(idx) => {
     443         6006 :                         // Exact match. That's the first entry to return, and walk
     444         6006 :                         // backwards from there.
     445         6006 :                         idx
     446              :                     }
     447         6072 :                     Err(idx) => {
     448              :                         // No exact match. The binary search returned the index of the
     449              :                         // first key that's > search_key. Back off by one, and walk
     450              :                         // backwards from there.
     451         6072 :                         if let Some(idx) = idx.checked_sub(1) {
     452         6060 :                             idx
     453              :                         } else {
     454           12 :                             return Ok(false);
     455              :                         }
     456              :                     }
     457              :                 };
     458        12066 :                 Either::Right((0..=idx).rev())
     459              :             };
     460              : 
     461              :             // idx points to the first match now. Keep going from there
     462      9487394 :             while let Some(idx) = iter.next() {
     463      8274098 :                 let key_off = idx * suffix_len;
     464      8274098 :                 let suffix = &node.keys[key_off..key_off + suffix_len];
     465      8274098 :                 keybuf[prefix_len..].copy_from_slice(suffix);
     466      8274098 :                 let value = node.value(idx);
     467      8274098 :                 #[allow(clippy::collapsible_if)]
     468      8274098 :                 if node.level == 0 {
     469              :                     // leaf
     470      7065082 :                     if !visitor(&keybuf, value.to_u64()) {
     471       633220 :                         return Ok(false);
     472      6431862 :                     }
     473              :                 } else {
     474      1209016 :                     stack.push((node_blknum, Some(iter)));
     475      1209016 :                     stack.push((value.to_blknum(), None));
     476      1209016 :                     break;
     477              :                 }
     478              :             }
     479              :         }
     480       601602 :         Ok(true)
     481      1234834 :     }
     482              : 
     483              :     #[allow(dead_code)]
     484           30 :     pub async fn dump(&self) -> Result<()> {
     485           30 :         let mut stack = Vec::new();
     486           30 :         let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
     487           30 : 
     488           30 :         stack.push((self.root_blk, String::new(), 0, 0, 0));
     489           30 : 
     490           30 :         let block_cursor = self.reader.block_cursor();
     491              : 
     492        18126 :         while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
     493        18096 :             let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
     494        18096 :             let buf: &[u8] = blk.as_ref();
     495        18096 :             let node = OnDiskNode::<L>::deparse(buf)?;
     496              : 
     497        18096 :             if child_idx == 0 {
     498           54 :                 print!("{:indent$}", "", indent = depth * 2);
     499           54 :                 let path_prefix = stack
     500           54 :                     .iter()
     501           54 :                     .map(|(_blknum, path, ..)| path.as_str())
     502           54 :                     .collect::<String>();
     503           54 :                 println!(
     504           54 :                     "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
     505           54 :                     hex::encode(node.prefix),
     506           54 :                     node.suffix_len
     507           54 :                 );
     508        18042 :             }
     509              : 
     510        18096 :             if child_idx + 1 < node.num_children {
     511        18042 :                 let key_off = key_off + node.suffix_len as usize;
     512        18042 :                 stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
     513        18042 :             }
     514        18096 :             let key = &node.keys[key_off..key_off + node.suffix_len as usize];
     515        18096 :             let val = node.value(child_idx as usize);
     516        18096 : 
     517        18096 :             print!("{:indent$}", "", indent = depth * 2 + 2);
     518        18096 :             println!("{}: {}", hex::encode(key), hex::encode(val.0));
     519        18096 : 
     520        18096 :             if node.level > 0 {
     521           24 :                 stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
     522        18072 :             }
     523              :         }
     524           30 :         Ok(())
     525           30 :     }
     526              : }
     527              : 
     528              : pub struct DiskBtreeIterator<'a> {
     529              :     #[allow(clippy::type_complexity)]
     530              :     stream: std::pin::Pin<
     531              :         Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
     532              :     >,
     533              : }
     534              : 
     535              : impl<'a> DiskBtreeIterator<'a> {
     536      6968816 :     pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
     537      6968816 :         self.stream.next().await
     538      6968816 :     }
     539              : }
     540              : 
     541              : ///
     542              : /// Public builder object, for creating a new tree.
     543              : ///
     544              : /// Usage: Create a builder object by calling 'new', load all the data into the
     545              : /// tree by calling 'append' for each key-value pair, and then call 'finish'
     546              : ///
     547              : /// 'L' is the key length in bytes
     548              : pub struct DiskBtreeBuilder<W, const L: usize>
     549              : where
     550              :     W: BlockWriter,
     551              : {
     552              :     writer: W,
     553              : 
     554              :     ///
     555              :     /// `stack[0]` is the current root page, `stack.last()` is the leaf.
     556              :     ///
     557              :     /// We maintain the length of the stack to be always greater than zero.
     558              :     /// Two exceptions are:
     559              :     /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
     560              :     ///    So because other methods cannot see the intermediate state invariant still holds.
     561              :     /// 2. `Self::finish`. It consumes self and does not return it back,
     562              :     ///    which means that this is where the structure is destroyed.
     563              :     ///    Thus stack of zero length cannot be observed by other methods.
     564              :     stack: Vec<BuildNode<L>>,
     565              : 
     566              :     /// Last key that was appended to the tree. Used to sanity check that append
     567              :     /// is called in increasing key order.
     568              :     last_key: Option<[u8; L]>,
     569              : }
     570              : 
     571              : impl<W, const L: usize> DiskBtreeBuilder<W, L>
     572              : where
     573              :     W: BlockWriter,
     574              : {
     575         5772 :     pub fn new(writer: W) -> Self {
     576         5772 :         DiskBtreeBuilder {
     577         5772 :             writer,
     578         5772 :             last_key: None,
     579         5772 :             stack: vec![BuildNode::new(0)],
     580         5772 :         }
     581         5772 :     }
     582              : 
     583     21655090 :     pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
     584     21655090 :         if value > MAX_VALUE {
     585            0 :             return Err(DiskBtreeError::AppendOverflow(value));
     586     21655090 :         }
     587     21655090 :         if let Some(last_key) = &self.last_key {
     588     21649858 :             if key <= last_key {
     589            6 :                 return Err(DiskBtreeError::UnsortedInput {
     590            6 :                     key: key.as_slice().into(),
     591            6 :                     last_key: last_key.as_slice().into(),
     592            6 :                 });
     593     21649852 :             }
     594         5232 :         }
     595     21655084 :         self.last_key = Some(*key);
     596     21655084 : 
     597     21655084 :         self.append_internal(key, Value::from_u64(value))
     598     21655090 :     }
     599              : 
     600     21693742 :     fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
     601     21693742 :         // Try to append to the current leaf buffer
     602     21693742 :         let last = self
     603     21693742 :             .stack
     604     21693742 :             .last_mut()
     605     21693742 :             .expect("should always have at least one item");
     606     21693742 :         let level = last.level;
     607     21693742 :         if last.push(key, value) {
     608     21620185 :             return Ok(());
     609        73557 :         }
     610        73557 : 
     611        73557 :         // It did not fit. Try to compress, and if it succeeds to make
     612        73557 :         // some room on the node, try appending to it again.
     613        73557 :         #[allow(clippy::collapsible_if)]
     614        73557 :         if last.compress() {
     615        37321 :             if last.push(key, value) {
     616        37293 :                 return Ok(());
     617           28 :             }
     618        36236 :         }
     619              : 
     620              :         // Could not append to the current leaf. Flush it and create a new one.
     621        36264 :         self.flush_node()?;
     622              : 
     623              :         // Replace the node we flushed with an empty one and append the new
     624              :         // key to it.
     625        36264 :         let mut last = BuildNode::new(level);
     626        36264 :         if !last.push(key, value) {
     627            0 :             return Err(DiskBtreeError::FailedToPushToNewLeafNode);
     628        36264 :         }
     629        36264 : 
     630        36264 :         self.stack.push(last);
     631        36264 : 
     632        36264 :         Ok(())
     633     21693742 :     }
     634              : 
     635              :     /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
     636              :     /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
     637              :     /// creates a new root page, increasing the height of the tree.
     638        38658 :     fn flush_node(&mut self) -> Result<()> {
     639        38658 :         // Get the current bottommost node in the stack and flush it to disk.
     640        38658 :         let last = self
     641        38658 :             .stack
     642        38658 :             .pop()
     643        38658 :             .expect("should always have at least one item");
     644        38658 :         let buf = last.pack();
     645        38658 :         let downlink_key = last.first_key();
     646        38658 :         let downlink_ptr = self.writer.write_blk(buf)?;
     647              : 
     648              :         // Append the downlink to the parent. If there is no parent, ie. this was the root page,
     649              :         // create a new root page, increasing the height of the tree.
     650        38658 :         if self.stack.is_empty() {
     651         2394 :             self.stack.push(BuildNode::new(last.level + 1));
     652        36264 :         }
     653        38658 :         self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
     654        38658 :     }
     655              : 
     656              :     ///
     657              :     /// Flushes everything to disk, and returns the block number of the root page.
     658              :     /// The caller must store the root block number "out-of-band", and pass it
     659              :     /// to the DiskBtreeReader::new() when you want to read the tree again.
     660              :     /// (In the image and delta layers, it is stored in the beginning of the file,
     661              :     /// in the summary header)
     662              :     ///
     663         5154 :     pub fn finish(mut self) -> Result<(u32, W)> {
     664              :         // flush all levels, except the root.
     665         7548 :         while self.stack.len() > 1 {
     666         2394 :             self.flush_node()?;
     667              :         }
     668              : 
     669         5154 :         let root = self
     670         5154 :             .stack
     671         5154 :             .first()
     672         5154 :             .expect("by the check above we left one item there");
     673         5154 :         let buf = root.pack();
     674         5154 :         let root_blknum = self.writer.write_blk(buf)?;
     675              : 
     676         5154 :         Ok((root_blknum, self.writer))
     677         5154 :     }
     678              : 
     679      6142314 :     pub fn borrow_writer(&self) -> &W {
     680      6142314 :         &self.writer
     681      6142314 :     }
     682              : }
     683              : 
     684              : ///
     685              : /// BuildNode represesnts an incomplete page that we are appending to.
     686              : ///
     687              : #[derive(Clone, Debug)]
     688              : struct BuildNode<const L: usize> {
     689              :     num_children: u16,
     690              :     level: u8,
     691              :     prefix: Vec<u8>,
     692              :     suffix_len: usize,
     693              : 
     694              :     keys: Vec<u8>,
     695              :     values: Vec<u8>,
     696              : 
     697              :     size: usize, // physical size of this node, if it was written to disk like this
     698              : }
     699              : 
     700              : const NODE_SIZE: usize = PAGE_SZ;
     701              : 
     702              : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
     703              : 
     704              : impl<const L: usize> BuildNode<L> {
     705        44430 :     fn new(level: u8) -> Self {
     706        44430 :         BuildNode {
     707        44430 :             num_children: 0,
     708        44430 :             level,
     709        44430 :             prefix: Vec::new(),
     710        44430 :             suffix_len: 0,
     711        44430 :             keys: Vec::new(),
     712        44430 :             values: Vec::new(),
     713        44430 :             size: NODE_HDR_SIZE,
     714        44430 :         }
     715        44430 :     }
     716              : 
     717              :     /// Try to append a key-value pair to this node. Returns 'true' on
     718              :     /// success, 'false' if the page was full or the key was
     719              :     /// incompatible with the prefix of the existing keys.
     720     21767327 :     fn push(&mut self, key: &[u8; L], value: Value) -> bool {
     721     21767327 :         // If we have already performed prefix-compression on the page,
     722     21767327 :         // check that the incoming key has the same prefix.
     723     21767327 :         if self.num_children > 0 {
     724              :             // does the prefix allow it?
     725     21723437 :             if !key.starts_with(&self.prefix) {
     726          666 :                 return false;
     727     21722771 :             }
     728        43890 :         } else {
     729        43890 :             self.suffix_len = key.len();
     730        43890 :         }
     731              : 
     732              :         // Is the node too full?
     733     21766661 :         if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
     734        72919 :             return false;
     735     21693742 :         }
     736     21693742 : 
     737     21693742 :         // All clear
     738     21693742 :         self.num_children += 1;
     739     21693742 :         self.keys.extend(&key[self.prefix.len()..]);
     740     21693742 :         self.values.extend(value.0);
     741     21693742 : 
     742     21693742 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     743     21693742 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     744              : 
     745     21693742 :         self.size += self.suffix_len + VALUE_SZ;
     746     21693742 : 
     747     21693742 :         true
     748     21767327 :     }
     749              : 
     750              :     ///
     751              :     /// Perform prefix-compression.
     752              :     ///
     753              :     /// Returns 'true' on success, 'false' if no compression was possible.
     754              :     ///
     755        73557 :     fn compress(&mut self) -> bool {
     756        73557 :         let first_suffix = self.first_suffix();
     757        73557 :         let last_suffix = self.last_suffix();
     758        73557 : 
     759        73557 :         // Find the common prefix among all keys
     760        73557 :         let mut prefix_len = 0;
     761       668813 :         while prefix_len < self.suffix_len {
     762       668813 :             if first_suffix[prefix_len] != last_suffix[prefix_len] {
     763        73557 :                 break;
     764       595256 :             }
     765       595256 :             prefix_len += 1;
     766              :         }
     767        73557 :         if prefix_len == 0 {
     768        36236 :             return false;
     769        37321 :         }
     770        37321 : 
     771        37321 :         // Can compress. Rewrite the keys without the common prefix.
     772        37321 :         self.prefix.extend(&self.keys[..prefix_len]);
     773        37321 : 
     774        37321 :         let mut new_keys = Vec::new();
     775        37321 :         let mut key_off = 0;
     776     10096874 :         while key_off < self.keys.len() {
     777     10059553 :             let next_key_off = key_off + self.suffix_len;
     778     10059553 :             new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
     779     10059553 :             key_off = next_key_off;
     780     10059553 :         }
     781        37321 :         self.keys = new_keys;
     782        37321 :         self.suffix_len -= prefix_len;
     783        37321 : 
     784        37321 :         self.size -= prefix_len * self.num_children as usize;
     785        37321 :         self.size += prefix_len;
     786        37321 : 
     787        37321 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     788        37321 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     789              : 
     790        37321 :         true
     791        73557 :     }
     792              : 
     793              :     ///
     794              :     /// Serialize the node to on-disk format.
     795              :     ///
     796        43812 :     fn pack(&self) -> Bytes {
     797        43812 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     798        43812 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     799        43812 :         assert!(self.num_children > 0);
     800              : 
     801        43812 :         let mut buf = BytesMut::new();
     802        43812 : 
     803        43812 :         buf.put_u16(self.num_children);
     804        43812 :         buf.put_u8(self.level);
     805        43812 :         buf.put_u8(self.prefix.len() as u8);
     806        43812 :         buf.put_u8(self.suffix_len as u8);
     807        43812 :         buf.put(&self.prefix[..]);
     808        43812 :         buf.put(&self.keys[..]);
     809        43812 :         buf.put(&self.values[..]);
     810        43812 : 
     811        43812 :         assert!(buf.len() == self.size);
     812              : 
     813        43812 :         assert!(buf.len() <= PAGE_SZ);
     814        43812 :         buf.resize(PAGE_SZ, 0);
     815        43812 :         buf.freeze()
     816        43812 :     }
     817              : 
     818       112215 :     fn first_suffix(&self) -> &[u8] {
     819       112215 :         &self.keys[..self.suffix_len]
     820       112215 :     }
     821        73557 :     fn last_suffix(&self) -> &[u8] {
     822        73557 :         &self.keys[self.keys.len() - self.suffix_len..]
     823        73557 :     }
     824              : 
     825              :     /// Return the full first key of the page, including the prefix
     826        38658 :     fn first_key(&self) -> [u8; L] {
     827        38658 :         let mut key = [0u8; L];
     828        38658 :         key[..self.prefix.len()].copy_from_slice(&self.prefix);
     829        38658 :         key[self.prefix.len()..].copy_from_slice(self.first_suffix());
     830        38658 :         key
     831        38658 :     }
     832              : }
     833              : 
     834              : #[cfg(test)]
     835              : pub(crate) mod tests {
     836              :     use super::*;
     837              :     use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
     838              :     use rand::Rng;
     839              :     use std::collections::BTreeMap;
     840              :     use std::sync::atomic::{AtomicUsize, Ordering};
     841              : 
     842              :     #[derive(Clone, Default)]
     843              :     pub(crate) struct TestDisk {
     844              :         blocks: Vec<Bytes>,
     845              :     }
     846              :     impl TestDisk {
     847           30 :         fn new() -> Self {
     848           30 :             Self::default()
     849           30 :         }
     850      3050276 :         pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
     851      3050276 :             let mut buf = [0u8; PAGE_SZ];
     852      3050276 :             buf.copy_from_slice(&self.blocks[blknum as usize]);
     853      3050276 :             Ok(std::sync::Arc::new(buf).into())
     854      3050276 :         }
     855              :     }
     856              :     impl BlockReader for TestDisk {
     857      1233652 :         fn block_cursor(&self) -> BlockCursor<'_> {
     858      1233652 :             BlockCursor::new(BlockReaderRef::TestDisk(self))
     859      1233652 :         }
     860              :     }
     861              :     impl BlockWriter for &mut TestDisk {
     862          648 :         fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
     863          648 :             let blknum = self.blocks.len();
     864          648 :             self.blocks.push(buf);
     865          648 :             Ok(blknum as u32)
     866          648 :         }
     867              :     }
     868              : 
     869              :     #[tokio::test]
     870            6 :     async fn basic() -> Result<()> {
     871            6 :         let mut disk = TestDisk::new();
     872            6 :         let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
     873            6 : 
     874            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     875            6 : 
     876            6 :         let all_keys: Vec<&[u8; 6]> = vec![
     877            6 :             b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
     878            6 :         ];
     879            6 :         let all_data: Vec<(&[u8; 6], u64)> = all_keys
     880            6 :             .iter()
     881            6 :             .enumerate()
     882           48 :             .map(|(idx, key)| (*key, idx as u64))
     883            6 :             .collect();
     884           48 :         for (key, val) in all_data.iter() {
     885           48 :             writer.append(key, *val)?;
     886            6 :         }
     887            6 : 
     888            6 :         let (root_offset, _writer) = writer.finish()?;
     889            6 : 
     890            6 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     891            6 : 
     892            6 :         reader.dump().await?;
     893            6 : 
     894            6 :         // Test the `get` function on all the keys.
     895           48 :         for (key, val) in all_data.iter() {
     896           48 :             assert_eq!(reader.get(key, &ctx).await?, Some(*val));
     897            6 :         }
     898            6 :         // And on some keys that don't exist
     899            6 :         assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
     900            6 :         assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
     901            6 :         assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
     902            6 : 
     903            6 :         // Test search with `visit` function
     904            6 :         let search_key = b"xabaaa";
     905            6 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     906            6 :             .iter()
     907           48 :             .filter(|(key, _value)| key[..] >= search_key[..])
     908           30 :             .map(|(key, value)| (key.to_vec(), *value))
     909            6 :             .collect();
     910            6 : 
     911            6 :         let mut data = Vec::new();
     912            6 :         reader
     913            6 :             .visit(
     914            6 :                 search_key,
     915            6 :                 VisitDirection::Forwards,
     916           30 :                 |key, value| {
     917           30 :                     data.push((key.to_vec(), value));
     918           30 :                     true
     919           30 :                 },
     920            6 :                 &ctx,
     921            6 :             )
     922            6 :             .await?;
     923            6 :         assert_eq!(data, expected);
     924            6 : 
     925            6 :         // Test a backwards scan
     926            6 :         let mut expected: Vec<(Vec<u8>, u64)> = all_data
     927            6 :             .iter()
     928           48 :             .filter(|(key, _value)| key[..] <= search_key[..])
     929           24 :             .map(|(key, value)| (key.to_vec(), *value))
     930            6 :             .collect();
     931            6 :         expected.reverse();
     932            6 :         let mut data = Vec::new();
     933            6 :         reader
     934            6 :             .visit(
     935            6 :                 search_key,
     936            6 :                 VisitDirection::Backwards,
     937           24 :                 |key, value| {
     938           24 :                     data.push((key.to_vec(), value));
     939           24 :                     true
     940           24 :                 },
     941            6 :                 &ctx,
     942            6 :             )
     943            6 :             .await?;
     944            6 :         assert_eq!(data, expected);
     945            6 : 
     946            6 :         // Backward scan where nothing matches
     947            6 :         reader
     948            6 :             .visit(
     949            6 :                 b"aaaaaa",
     950            6 :                 VisitDirection::Backwards,
     951            6 :                 |key, value| {
     952            0 :                     panic!("found unexpected key {}: {}", hex::encode(key), value);
     953            6 :                 },
     954            6 :                 &ctx,
     955            6 :             )
     956            6 :             .await?;
     957            6 : 
     958            6 :         // Full scan
     959            6 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     960            6 :             .iter()
     961           48 :             .map(|(key, value)| (key.to_vec(), *value))
     962            6 :             .collect();
     963            6 :         let mut data = Vec::new();
     964            6 :         reader
     965            6 :             .visit(
     966            6 :                 &[0u8; 6],
     967            6 :                 VisitDirection::Forwards,
     968           48 :                 |key, value| {
     969           48 :                     data.push((key.to_vec(), value));
     970           48 :                     true
     971           48 :                 },
     972            6 :                 &ctx,
     973            6 :             )
     974            6 :             .await?;
     975            6 :         assert_eq!(data, expected);
     976            6 : 
     977            6 :         Ok(())
     978            6 :     }
     979              : 
     980              :     #[tokio::test]
     981            6 :     async fn lots_of_keys() -> Result<()> {
     982            6 :         let mut disk = TestDisk::new();
     983            6 :         let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
     984            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     985            6 : 
     986            6 :         const NUM_KEYS: u64 = 1000;
     987            6 : 
     988            6 :         let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
     989            6 : 
     990         6006 :         for idx in 0..NUM_KEYS {
     991         6000 :             let key_int: u64 = 1 + idx * 2;
     992         6000 :             let key = u64::to_be_bytes(key_int);
     993         6000 :             writer.append(&key, idx)?;
     994            6 : 
     995         6000 :             all_data.insert(key_int, idx);
     996            6 :         }
     997            6 : 
     998            6 :         let (root_offset, _writer) = writer.finish()?;
     999            6 : 
    1000            6 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1001            6 : 
    1002            6 :         reader.dump().await?;
    1003            6 : 
    1004            6 :         use std::sync::Mutex;
    1005            6 : 
    1006            6 :         let result = Mutex::new(Vec::new());
    1007            6 :         let limit: AtomicUsize = AtomicUsize::new(10);
    1008       251460 :         let take_ten = |key: &[u8], value: u64| {
    1009       251460 :             let mut keybuf = [0u8; 8];
    1010       251460 :             keybuf.copy_from_slice(key);
    1011       251460 :             let key_int = u64::from_be_bytes(keybuf);
    1012       251460 : 
    1013       251460 :             let mut result = result.lock().unwrap();
    1014       251460 :             result.push((key_int, value));
    1015       251460 : 
    1016       251460 :             // keep going until we have 10 matches
    1017       251460 :             result.len() < limit.load(Ordering::Relaxed)
    1018       251460 :         };
    1019            6 : 
    1020        12060 :         for search_key_int in 0..(NUM_KEYS * 2 + 10) {
    1021        12060 :             let search_key = u64::to_be_bytes(search_key_int);
    1022        12060 :             assert_eq!(
    1023        12060 :                 reader.get(&search_key, &ctx).await?,
    1024        12060 :                 all_data.get(&search_key_int).cloned()
    1025            6 :             );
    1026            6 : 
    1027            6 :             // Test a forward scan starting with this key
    1028        12060 :             result.lock().unwrap().clear();
    1029        12060 :             reader
    1030        12060 :                 .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1031            6 :                 .await?;
    1032        12060 :             let expected = all_data
    1033        12060 :                 .range(search_key_int..)
    1034        12060 :                 .take(10)
    1035       119460 :                 .map(|(&key, &val)| (key, val))
    1036        12060 :                 .collect::<Vec<(u64, u64)>>();
    1037        12060 :             assert_eq!(*result.lock().unwrap(), expected);
    1038            6 : 
    1039            6 :             // And a backwards scan
    1040        12060 :             result.lock().unwrap().clear();
    1041        12060 :             reader
    1042        12060 :                 .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1043            6 :                 .await?;
    1044        12060 :             let expected = all_data
    1045        12060 :                 .range(..=search_key_int)
    1046        12060 :                 .rev()
    1047        12060 :                 .take(10)
    1048       120000 :                 .map(|(&key, &val)| (key, val))
    1049        12060 :                 .collect::<Vec<(u64, u64)>>();
    1050        12060 :             assert_eq!(*result.lock().unwrap(), expected);
    1051            6 :         }
    1052            6 : 
    1053            6 :         // full scan
    1054            6 :         let search_key = u64::to_be_bytes(0);
    1055            6 :         limit.store(usize::MAX, Ordering::Relaxed);
    1056            6 :         result.lock().unwrap().clear();
    1057            6 :         reader
    1058            6 :             .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1059            6 :             .await?;
    1060            6 :         let expected = all_data
    1061            6 :             .iter()
    1062         6000 :             .map(|(&key, &val)| (key, val))
    1063            6 :             .collect::<Vec<(u64, u64)>>();
    1064            6 :         assert_eq!(*result.lock().unwrap(), expected);
    1065            6 : 
    1066            6 :         // full scan
    1067            6 :         let search_key = u64::to_be_bytes(u64::MAX);
    1068            6 :         limit.store(usize::MAX, Ordering::Relaxed);
    1069            6 :         result.lock().unwrap().clear();
    1070            6 :         reader
    1071            6 :             .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1072            6 :             .await?;
    1073            6 :         let expected = all_data
    1074            6 :             .iter()
    1075            6 :             .rev()
    1076         6000 :             .map(|(&key, &val)| (key, val))
    1077            6 :             .collect::<Vec<(u64, u64)>>();
    1078            6 :         assert_eq!(*result.lock().unwrap(), expected);
    1079            6 : 
    1080            6 :         Ok(())
    1081            6 :     }
    1082              : 
    1083              :     #[tokio::test]
    1084            6 :     async fn random_data() -> Result<()> {
    1085            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1086            6 : 
    1087            6 :         // Generate random keys with exponential distribution, to
    1088            6 :         // exercise the prefix compression
    1089            6 :         const NUM_KEYS: usize = 100000;
    1090            6 :         let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
    1091       600006 :         for idx in 0..NUM_KEYS {
    1092       600000 :             let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
    1093       600000 :             let t = -(f64::ln(u));
    1094       600000 :             let key_int = (t * 1000000.0) as u128;
    1095       600000 : 
    1096       600000 :             all_data.insert(key_int, idx as u64);
    1097       600000 :         }
    1098            6 : 
    1099            6 :         // Build a tree from it
    1100            6 :         let mut disk = TestDisk::new();
    1101            6 :         let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
    1102            6 : 
    1103       585316 :         for (&key, &val) in all_data.iter() {
    1104       585316 :             writer.append(&u128::to_be_bytes(key), val)?;
    1105            6 :         }
    1106            6 :         let (root_offset, _writer) = writer.finish()?;
    1107            6 : 
    1108            6 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1109            6 : 
    1110            6 :         // Test get() operation on all the keys
    1111       585316 :         for (&key, &val) in all_data.iter() {
    1112       585316 :             let search_key = u128::to_be_bytes(key);
    1113       585316 :             assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
    1114            6 :         }
    1115            6 : 
    1116            6 :         // Test get() operations on random keys, most of which will not exist
    1117       600006 :         for _ in 0..100000 {
    1118       600000 :             let key_int = rand::thread_rng().gen::<u128>();
    1119       600000 :             let search_key = u128::to_be_bytes(key_int);
    1120       600000 :             assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
    1121            6 :         }
    1122            6 : 
    1123            6 :         // Test boundary cases
    1124            6 :         assert!(
    1125            6 :             reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
    1126            6 :                 == all_data.get(&u128::MIN).cloned()
    1127            6 :         );
    1128            6 :         assert!(
    1129            6 :             reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
    1130            6 :                 == all_data.get(&u128::MAX).cloned()
    1131            6 :         );
    1132            6 : 
    1133            6 :         // Test iterator and get_stream API
    1134            6 :         let mut iter = reader.iter(&[0; 16], &ctx);
    1135            6 :         let mut cnt = 0;
    1136       585322 :         while let Some(res) = iter.next().await {
    1137       585316 :             let (key, val) = res?;
    1138       585316 :             let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
    1139       585316 :             assert_eq!(val, *all_data.get(&key).unwrap());
    1140       585316 :             cnt += 1;
    1141            6 :         }
    1142            6 :         assert_eq!(cnt, all_data.len());
    1143            6 : 
    1144            6 :         Ok(())
    1145            6 :     }
    1146              : 
    1147              :     #[test]
    1148            6 :     fn unsorted_input() {
    1149            6 :         let mut disk = TestDisk::new();
    1150            6 :         let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
    1151            6 : 
    1152            6 :         let _ = writer.append(b"ba", 1);
    1153            6 :         let _ = writer.append(b"bb", 2);
    1154            6 :         let err = writer.append(b"aa", 3).expect_err("should've failed");
    1155            6 :         match err {
    1156            6 :             DiskBtreeError::UnsortedInput { key, last_key } => {
    1157            6 :                 assert_eq!(key.as_ref(), b"aa".as_slice());
    1158            6 :                 assert_eq!(last_key.as_ref(), b"bb".as_slice());
    1159              :             }
    1160            0 :             _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
    1161              :         }
    1162            6 :     }
    1163              : 
    1164              :     ///
    1165              :     /// This test contains a particular data set, see disk_btree_test_data.rs
    1166              :     ///
    1167              :     #[tokio::test]
    1168            6 :     async fn particular_data() -> Result<()> {
    1169            6 :         // Build a tree from it
    1170            6 :         let mut disk = TestDisk::new();
    1171            6 :         let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
    1172            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1173            6 : 
    1174        12006 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1175        12000 :             writer.append(&key, val)?;
    1176            6 :         }
    1177            6 :         let (root_offset, writer) = writer.finish()?;
    1178            6 : 
    1179            6 :         println!("SIZE: {} blocks", writer.blocks.len());
    1180            6 : 
    1181            6 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1182            6 : 
    1183            6 :         // Test get() operation on all the keys
    1184        12006 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1185        12000 :             assert_eq!(reader.get(&key, &ctx).await?, Some(val));
    1186            6 :         }
    1187            6 : 
    1188            6 :         // Test full scan
    1189            6 :         let mut count = 0;
    1190            6 :         reader
    1191            6 :             .visit(
    1192            6 :                 &[0u8; 26],
    1193            6 :                 VisitDirection::Forwards,
    1194        12000 :                 |_key, _value| {
    1195        12000 :                     count += 1;
    1196        12000 :                     true
    1197        12000 :                 },
    1198            6 :                 &ctx,
    1199            6 :             )
    1200            6 :             .await?;
    1201            6 :         assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
    1202            6 : 
    1203            6 :         reader.dump().await?;
    1204            6 : 
    1205            6 :         Ok(())
    1206            6 :     }
    1207              : }
    1208              : 
    1209              : #[cfg(test)]
    1210              : #[path = "disk_btree_test_data.rs"]
    1211              : mod disk_btree_test_data;
        

Generated by: LCOV version 2.1-beta