LCOV - code coverage report
Current view: top level - pageserver/src/tenant - disk_btree.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 98.9 % 743 735
Test Date: 2024-05-10 13:18:37 Functions: 81.6 % 207 169

            Line data    Source code
       1              : //!
       2              : //! Simple on-disk B-tree implementation
       3              : //!
       4              : //! This is used as the index structure within image and delta layers
       5              : //!
       6              : //! Features:
       7              : //! - Fixed-width keys
       8              : //! - Fixed-width values (VALUE_SZ)
       9              : //! - The tree is created in a bulk operation. Insert/deletion after creation
      10              : //!   is not supported
      11              : //! - page-oriented
      12              : //!
      13              : //! TODO:
      14              : //! - maybe something like an Adaptive Radix Tree would be more efficient?
      15              : //! - the values stored by image and delta layers are offsets into the file,
      16              : //!   and they are in monotonically increasing order. Prefix compression would
      17              : //!   be very useful for them, too.
      18              : //! - An Iterator interface would be more convenient for the callers than the
      19              : //!   'visit' function
      20              : //!
      21              : use async_stream::try_stream;
      22              : use byteorder::{ReadBytesExt, BE};
      23              : use bytes::{BufMut, Bytes, BytesMut};
      24              : use either::Either;
      25              : use futures::Stream;
      26              : use hex;
      27              : use std::{
      28              :     cmp::Ordering,
      29              :     io,
      30              :     iter::Rev,
      31              :     ops::{Range, RangeInclusive},
      32              :     result,
      33              : };
      34              : use thiserror::Error;
      35              : use tracing::error;
      36              : 
      37              : use crate::{
      38              :     context::{DownloadBehavior, RequestContext},
      39              :     task_mgr::TaskKind,
      40              :     tenant::block_io::{BlockReader, BlockWriter},
      41              : };
      42              : 
      43              : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
      44              : pub const VALUE_SZ: usize = 5;
      45              : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
      46              : 
      47              : pub const PAGE_SZ: usize = 8192;
      48              : 
      49              : #[derive(Clone, Copy, Debug)]
      50              : struct Value([u8; VALUE_SZ]);
      51              : 
      52              : impl Value {
      53      3363783 :     fn from_slice(slice: &[u8]) -> Value {
      54      3363783 :         let mut b = [0u8; VALUE_SZ];
      55      3363783 :         b.copy_from_slice(slice);
      56      3363783 :         Value(b)
      57      3363783 :     }
      58              : 
      59      6606533 :     fn from_u64(x: u64) -> Value {
      60      6606533 :         assert!(x <= 0x007f_ffff_ffff);
      61      6606533 :         Value([
      62      6606533 :             (x >> 32) as u8,
      63      6606533 :             (x >> 24) as u8,
      64      6606533 :             (x >> 16) as u8,
      65      6606533 :             (x >> 8) as u8,
      66      6606533 :             x as u8,
      67      6606533 :         ])
      68      6606533 :     }
      69              : 
      70        12308 :     fn from_blknum(x: u32) -> Value {
      71        12308 :         Value([
      72        12308 :             0x80,
      73        12308 :             (x >> 24) as u8,
      74        12308 :             (x >> 16) as u8,
      75        12308 :             (x >> 8) as u8,
      76        12308 :             x as u8,
      77        12308 :         ])
      78        12308 :     }
      79              : 
      80              :     #[allow(dead_code)]
      81            0 :     fn is_offset(self) -> bool {
      82            0 :         self.0[0] & 0x80 != 0
      83            0 :     }
      84              : 
      85      2688160 :     fn to_u64(self) -> u64 {
      86      2688160 :         let b = &self.0;
      87      2688160 :         (b[0] as u64) << 32
      88      2688160 :             | (b[1] as u64) << 24
      89      2688160 :             | (b[2] as u64) << 16
      90      2688160 :             | (b[3] as u64) << 8
      91      2688160 :             | b[4] as u64
      92      2688160 :     }
      93              : 
      94       669599 :     fn to_blknum(self) -> u32 {
      95       669599 :         let b = &self.0;
      96       669599 :         assert!(b[0] == 0x80);
      97       669599 :         (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
      98       669599 :     }
      99              : }
     100              : 
     101            0 : #[derive(Error, Debug)]
     102              : pub enum DiskBtreeError {
     103              :     #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
     104              :     AppendOverflow(u64),
     105              : 
     106              :     #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
     107              :     UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
     108              : 
     109              :     #[error("Could not push to new leaf node")]
     110              :     FailedToPushToNewLeafNode,
     111              : 
     112              :     #[error("IoError: {0}")]
     113              :     Io(#[from] io::Error),
     114              : }
     115              : 
     116              : pub type Result<T> = result::Result<T, DiskBtreeError>;
     117              : 
     118              : /// This is the on-disk representation.
     119              : struct OnDiskNode<'a, const L: usize> {
     120              :     // Fixed-width fields
     121              :     num_children: u16,
     122              :     level: u8,
     123              :     prefix_len: u8,
     124              :     suffix_len: u8,
     125              : 
     126              :     // Variable-length fields. These are stored on-disk after the fixed-width
     127              :     // fields, in this order. In the in-memory representation, these point to
     128              :     // the right parts in the page buffer.
     129              :     prefix: &'a [u8],
     130              :     keys: &'a [u8],
     131              :     values: &'a [u8],
     132              : }
     133              : 
     134              : impl<'a, const L: usize> OnDiskNode<'a, L> {
     135              :     ///
     136              :     /// Interpret a PAGE_SZ page as a node.
     137              :     ///
     138      1564359 :     fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
     139      1564359 :         let mut cursor = std::io::Cursor::new(buf);
     140      1564359 :         let num_children = cursor.read_u16::<BE>()?;
     141      1564359 :         let level = cursor.read_u8()?;
     142      1564359 :         let prefix_len = cursor.read_u8()?;
     143      1564359 :         let suffix_len = cursor.read_u8()?;
     144              : 
     145      1564359 :         let mut off = cursor.position();
     146      1564359 :         let prefix_off = off as usize;
     147      1564359 :         off += prefix_len as u64;
     148      1564359 : 
     149      1564359 :         let keys_off = off as usize;
     150      1564359 :         let keys_len = num_children as usize * suffix_len as usize;
     151      1564359 :         off += keys_len as u64;
     152      1564359 : 
     153      1564359 :         let values_off = off as usize;
     154      1564359 :         let values_len = num_children as usize * VALUE_SZ;
     155      1564359 :         //off += values_len as u64;
     156      1564359 : 
     157      1564359 :         let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
     158      1564359 :         let keys = &buf[keys_off..keys_off + keys_len];
     159      1564359 :         let values = &buf[values_off..values_off + values_len];
     160      1564359 : 
     161      1564359 :         Ok(OnDiskNode {
     162      1564359 :             num_children,
     163      1564359 :             level,
     164      1564359 :             prefix_len,
     165      1564359 :             suffix_len,
     166      1564359 :             prefix,
     167      1564359 :             keys,
     168      1564359 :             values,
     169      1564359 :         })
     170      1564359 :     }
     171              : 
     172              :     ///
     173              :     /// Read a value at 'idx'
     174              :     ///
     175      3363783 :     fn value(&self, idx: usize) -> Value {
     176      3363783 :         let value_off = idx * VALUE_SZ;
     177      3363783 :         let value_slice = &self.values[value_off..value_off + VALUE_SZ];
     178      3363783 :         Value::from_slice(value_slice)
     179      3363783 :     }
     180              : 
     181      1354266 :     fn binary_search(
     182      1354266 :         &self,
     183      1354266 :         search_key: &[u8; L],
     184      1354266 :         keybuf: &mut [u8],
     185      1354266 :     ) -> result::Result<usize, usize> {
     186      1354266 :         let mut size = self.num_children as usize;
     187      1354266 :         let mut low = 0;
     188      1354266 :         let mut high = size;
     189      9957815 :         while low < high {
     190      8809752 :             let mid = low + size / 2;
     191      8809752 : 
     192      8809752 :             let key_off = mid * self.suffix_len as usize;
     193      8809752 :             let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
     194      8809752 :             // Does this match?
     195      8809752 :             keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
     196      8809752 : 
     197      8809752 :             let cmp = keybuf[..].cmp(search_key);
     198      8809752 : 
     199      8809752 :             if cmp == Ordering::Less {
     200      5597070 :                 low = mid + 1;
     201      5597070 :             } else if cmp == Ordering::Greater {
     202      3006479 :                 high = mid;
     203      3006479 :             } else {
     204       206203 :                 return Ok(mid);
     205              :             }
     206      8603549 :             size = high - low;
     207              :         }
     208      1148063 :         Err(low)
     209      1354266 :     }
     210              : }
     211              : 
     212              : ///
     213              : /// Public reader object, to search the tree.
     214              : ///
     215              : pub struct DiskBtreeReader<R, const L: usize>
     216              : where
     217              :     R: BlockReader,
     218              : {
     219              :     start_blk: u32,
     220              :     root_blk: u32,
     221              :     reader: R,
     222              : }
     223              : 
     224              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     225              : pub enum VisitDirection {
     226              :     Forwards,
     227              :     Backwards,
     228              : }
     229              : 
     230              : impl<R, const L: usize> DiskBtreeReader<R, L>
     231              : where
     232              :     R: BlockReader,
     233              : {
     234       209968 :     pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
     235       209968 :         DiskBtreeReader {
     236       209968 :             start_blk,
     237       209968 :             root_blk,
     238       209968 :             reader,
     239       209968 :         }
     240       209968 :     }
     241              : 
     242              :     ///
     243              :     /// Read the value for given key. Returns the value, or None if it doesn't exist.
     244              :     ///
     245       403615 :     pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
     246       403615 :         let mut result: Option<u64> = None;
     247       403615 :         self.visit(
     248       403615 :             search_key,
     249       403615 :             VisitDirection::Forwards,
     250       403615 :             |key, value| {
     251       203591 :                 if key == search_key {
     252       201581 :                     result = Some(value);
     253       201581 :                 }
     254       203591 :                 false
     255       403615 :             },
     256       403615 :             ctx,
     257       403615 :         )
     258          299 :         .await?;
     259       403615 :         Ok(result)
     260       403615 :     }
     261              : 
     262              :     /// Return a stream which yields all key, value pairs from the index
     263              :     /// starting from the first key greater or equal to `start_key`.
     264              :     ///
     265              :     /// Note that this is a copy of [`Self::visit`].
     266              :     /// TODO: Once the sequential read path is removed this will become
     267              :     /// the only index traversal method.
     268        63958 :     pub fn get_stream_from<'a>(
     269        63958 :         &'a self,
     270        63958 :         start_key: &'a [u8; L],
     271        63958 :         ctx: &'a RequestContext,
     272        63958 :     ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
     273              :         try_stream! {
     274              :             let mut stack = Vec::new();
     275              :             stack.push((self.root_blk, None));
     276              :             let block_cursor = self.reader.block_cursor();
     277              :             while let Some((node_blknum, opt_iter)) = stack.pop() {
     278              :                 // Locate the node.
     279              :                 let node_buf = block_cursor
     280              :                     .read_blk(self.start_blk + node_blknum, ctx)
     281              :                     .await?;
     282              : 
     283              :                 let node = OnDiskNode::deparse(node_buf.as_ref())?;
     284              :                 let prefix_len = node.prefix_len as usize;
     285              :                 let suffix_len = node.suffix_len as usize;
     286              : 
     287              :                 assert!(node.num_children > 0);
     288              : 
     289              :                 let mut keybuf = Vec::new();
     290              :                 keybuf.extend(node.prefix);
     291              :                 keybuf.resize(prefix_len + suffix_len, 0);
     292              : 
     293              :                 let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
     294              :                     iter
     295              :                 } else {
     296              :                     // Locate the first match
     297              :                     let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
     298              :                         Ok(idx) => idx,
     299              :                         Err(idx) => {
     300              :                             if node.level == 0 {
     301              :                                 // Imagine that the node contains the following keys:
     302              :                                 //
     303              :                                 // 1
     304              :                                 // 3  <-- idx
     305              :                                 // 5
     306              :                                 //
     307              :                                 // If the search key is '2' and there is exact match,
     308              :                                 // the binary search would return the index of key
     309              :                                 // '3'. That's cool, '3' is the first key to return.
     310              :                                 idx
     311              :                             } else {
     312              :                                 // This is an internal page, so each key represents a lower
     313              :                                 // bound for what's in the child page. If there is no exact
     314              :                                 // match, we have to return the *previous* entry.
     315              :                                 //
     316              :                                 // 1  <-- return this
     317              :                                 // 3  <-- idx
     318              :                                 // 5
     319              :                                 idx.saturating_sub(1)
     320              :                             }
     321              :                         }
     322              :                     };
     323              :                     Either::Left(idx..node.num_children.into())
     324              :                 };
     325              : 
     326              :                 // idx points to the first match now. Keep going from there
     327              :                 while let Some(idx) = iter.next() {
     328              :                     let key_off = idx * suffix_len;
     329              :                     let suffix = &node.keys[key_off..key_off + suffix_len];
     330              :                     keybuf[prefix_len..].copy_from_slice(suffix);
     331              :                     let value = node.value(idx);
     332              :                     #[allow(clippy::collapsible_if)]
     333              :                     if node.level == 0 {
     334              :                         // leaf
     335              :                         yield (keybuf.clone(), value.to_u64());
     336              :                     } else {
     337              :                         stack.push((node_blknum, Some(iter)));
     338              :                         stack.push((value.to_blknum(), None));
     339              :                         break;
     340              :                     }
     341              :                 }
     342              :             }
     343              :         }
     344        63958 :     }
     345              : 
     346              :     ///
     347              :     /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
     348              :     /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
     349              :     /// backwards)
     350              :     ///
     351       620717 :     pub async fn visit<V>(
     352       620717 :         &self,
     353       620717 :         search_key: &[u8; L],
     354       620717 :         dir: VisitDirection,
     355       620717 :         mut visitor: V,
     356       620717 :         ctx: &RequestContext,
     357       620717 :     ) -> Result<bool>
     358       620717 :     where
     359       620717 :         V: FnMut(&[u8], u64) -> bool,
     360       620717 :     {
     361       620717 :         let mut stack = Vec::new();
     362       620717 :         stack.push((self.root_blk, None));
     363       620717 :         let block_cursor = self.reader.block_cursor();
     364      1630952 :         while let Some((node_blknum, opt_iter)) = stack.pop() {
     365              :             // Locate the node.
     366      1430558 :             let node_buf = block_cursor
     367      1430558 :                 .read_blk(self.start_blk + node_blknum, ctx)
     368        22713 :                 .await?;
     369              : 
     370      1430558 :             let node = OnDiskNode::deparse(node_buf.as_ref())?;
     371      1430558 :             let prefix_len = node.prefix_len as usize;
     372      1430558 :             let suffix_len = node.suffix_len as usize;
     373      1430558 : 
     374      1430558 :             assert!(node.num_children > 0);
     375              : 
     376      1430558 :             let mut keybuf = Vec::new();
     377      1430558 :             keybuf.extend(node.prefix);
     378      1430558 :             keybuf.resize(prefix_len + suffix_len, 0);
     379              : 
     380      1430558 :             let mut iter = if let Some(iter) = opt_iter {
     381       203854 :                 iter
     382      1226704 :             } else if dir == VisitDirection::Forwards {
     383              :                 // Locate the first match
     384       810808 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     385       203789 :                     Ok(idx) => idx,
     386       607019 :                     Err(idx) => {
     387       607019 :                         if node.level == 0 {
     388              :                             // Imagine that the node contains the following keys:
     389              :                             //
     390              :                             // 1
     391              :                             // 3  <-- idx
     392              :                             // 5
     393              :                             //
     394              :                             // If the search key is '2' and there is exact match,
     395              :                             // the binary search would return the index of key
     396              :                             // '3'. That's cool, '3' is the first key to return.
     397       207956 :                             idx
     398              :                         } else {
     399              :                             // This is an internal page, so each key represents a lower
     400              :                             // bound for what's in the child page. If there is no exact
     401              :                             // match, we have to return the *previous* entry.
     402              :                             //
     403              :                             // 1  <-- return this
     404              :                             // 3  <-- idx
     405              :                             // 5
     406       399063 :                             idx.saturating_sub(1)
     407              :                         }
     408              :                     }
     409              :                 };
     410       810808 :                 Either::Left(idx..node.num_children.into())
     411              :             } else {
     412       415896 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     413         2228 :                     Ok(idx) => {
     414         2228 :                         // Exact match. That's the first entry to return, and walk
     415         2228 :                         // backwards from there.
     416         2228 :                         idx
     417              :                     }
     418       413668 :                     Err(idx) => {
     419              :                         // No exact match. The binary search returned the index of the
     420              :                         // first key that's > search_key. Back off by one, and walk
     421              :                         // backwards from there.
     422       413668 :                         if let Some(idx) = idx.checked_sub(1) {
     423       408160 :                             idx
     424              :                         } else {
     425         5508 :                             return Ok(false);
     426              :                         }
     427              :                     }
     428              :                 };
     429       410388 :                 Either::Right((0..=idx).rev())
     430              :             };
     431              : 
     432              :             // idx points to the first match now. Keep going from there
     433      3546964 :             while let Some(idx) = iter.next() {
     434      3142716 :                 let key_off = idx * suffix_len;
     435      3142716 :                 let suffix = &node.keys[key_off..key_off + suffix_len];
     436      3142716 :                 keybuf[prefix_len..].copy_from_slice(suffix);
     437      3142716 :                 let value = node.value(idx);
     438      3142716 :                 #[allow(clippy::collapsible_if)]
     439      3142716 :                 if node.level == 0 {
     440              :                     // leaf
     441      2536729 :                     if !visitor(&keybuf, value.to_u64()) {
     442       414815 :                         return Ok(false);
     443      2121914 :                     }
     444              :                 } else {
     445       605987 :                     stack.push((node_blknum, Some(iter)));
     446       605987 :                     stack.push((value.to_blknum(), None));
     447       605987 :                     break;
     448              :                 }
     449              :             }
     450              :         }
     451       200394 :         Ok(true)
     452       620717 :     }
     453              : 
     454              :     #[allow(dead_code)]
     455           10 :     pub async fn dump(&self) -> Result<()> {
     456           10 :         let mut stack = Vec::new();
     457           10 :         let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
     458           10 : 
     459           10 :         stack.push((self.root_blk, String::new(), 0, 0, 0));
     460           10 : 
     461           10 :         let block_cursor = self.reader.block_cursor();
     462              : 
     463         6042 :         while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
     464         6032 :             let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
     465         6032 :             let buf: &[u8] = blk.as_ref();
     466         6032 :             let node = OnDiskNode::<L>::deparse(buf)?;
     467              : 
     468         6032 :             if child_idx == 0 {
     469           18 :                 print!("{:indent$}", "", indent = depth * 2);
     470           18 :                 let path_prefix = stack
     471           18 :                     .iter()
     472           18 :                     .map(|(_blknum, path, ..)| path.as_str())
     473           18 :                     .collect::<String>();
     474           18 :                 println!(
     475           18 :                     "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
     476           18 :                     hex::encode(node.prefix),
     477           18 :                     node.suffix_len
     478           18 :                 );
     479         6014 :             }
     480              : 
     481         6032 :             if child_idx + 1 < node.num_children {
     482         6014 :                 let key_off = key_off + node.suffix_len as usize;
     483         6014 :                 stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
     484         6014 :             }
     485         6032 :             let key = &node.keys[key_off..key_off + node.suffix_len as usize];
     486         6032 :             let val = node.value(child_idx as usize);
     487         6032 : 
     488         6032 :             print!("{:indent$}", "", indent = depth * 2 + 2);
     489         6032 :             println!("{}: {}", hex::encode(key), hex::encode(val.0));
     490         6032 : 
     491         6032 :             if node.level > 0 {
     492            8 :                 stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
     493         6024 :             }
     494              :         }
     495           10 :         Ok(())
     496           10 :     }
     497              : }
     498              : 
     499              : ///
     500              : /// Public builder object, for creating a new tree.
     501              : ///
     502              : /// Usage: Create a builder object by calling 'new', load all the data into the
     503              : /// tree by calling 'append' for each key-value pair, and then call 'finish'
     504              : ///
     505              : /// 'L' is the key length in bytes
     506              : pub struct DiskBtreeBuilder<W, const L: usize>
     507              : where
     508              :     W: BlockWriter,
     509              : {
     510              :     writer: W,
     511              : 
     512              :     ///
     513              :     /// `stack[0]` is the current root page, `stack.last()` is the leaf.
     514              :     ///
     515              :     /// We maintain the length of the stack to be always greater than zero.
     516              :     /// Two exceptions are:
     517              :     /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
     518              :     ///   So because other methods cannot see the intermediate state invariant still holds.
     519              :     /// 2. `Self::finish`. It consumes self and does not return it back,
     520              :     ///  which means that this is where the structure is destroyed.
     521              :     ///  Thus stack of zero length cannot be observed by other methods.
     522              :     stack: Vec<BuildNode<L>>,
     523              : 
     524              :     /// Last key that was appended to the tree. Used to sanity check that append
     525              :     /// is called in increasing key order.
     526              :     last_key: Option<[u8; L]>,
     527              : }
     528              : 
     529              : impl<W, const L: usize> DiskBtreeBuilder<W, L>
     530              : where
     531              :     W: BlockWriter,
     532              : {
     533         1190 :     pub fn new(writer: W) -> Self {
     534         1190 :         DiskBtreeBuilder {
     535         1190 :             writer,
     536         1190 :             last_key: None,
     537         1190 :             stack: vec![BuildNode::new(0)],
     538         1190 :         }
     539         1190 :     }
     540              : 
     541      6606535 :     pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
     542      6606535 :         if value > MAX_VALUE {
     543            0 :             return Err(DiskBtreeError::AppendOverflow(value));
     544      6606535 :         }
     545      6606535 :         if let Some(last_key) = &self.last_key {
     546      6605345 :             if key <= last_key {
     547            2 :                 return Err(DiskBtreeError::UnsortedInput {
     548            2 :                     key: key.as_slice().into(),
     549            2 :                     last_key: last_key.as_slice().into(),
     550            2 :                 });
     551      6605343 :             }
     552         1190 :         }
     553      6606533 :         self.last_key = Some(*key);
     554      6606533 : 
     555      6606533 :         self.append_internal(key, Value::from_u64(value))
     556      6606535 :     }
     557              : 
     558      6618841 :     fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
     559      6618841 :         // Try to append to the current leaf buffer
     560      6618841 :         let last = self
     561      6618841 :             .stack
     562      6618841 :             .last_mut()
     563      6618841 :             .expect("should always have at least one item");
     564      6618841 :         let level = last.level;
     565      6618841 :         if last.push(key, value) {
     566      6595429 :             return Ok(());
     567        23412 :         }
     568        23412 : 
     569        23412 :         // It did not fit. Try to compress, and if it succeeds to make
     570        23412 :         // some room on the node, try appending to it again.
     571        23412 :         #[allow(clippy::collapsible_if)]
     572        23412 :         if last.compress() {
     573        11873 :             if last.push(key, value) {
     574        11860 :                 return Ok(());
     575           13 :             }
     576        11539 :         }
     577              : 
     578              :         // Could not append to the current leaf. Flush it and create a new one.
     579        11552 :         self.flush_node()?;
     580              : 
     581              :         // Replace the node we flushed with an empty one and append the new
     582              :         // key to it.
     583        11552 :         let mut last = BuildNode::new(level);
     584        11552 :         if !last.push(key, value) {
     585            0 :             return Err(DiskBtreeError::FailedToPushToNewLeafNode);
     586        11552 :         }
     587        11552 : 
     588        11552 :         self.stack.push(last);
     589        11552 : 
     590        11552 :         Ok(())
     591      6618841 :     }
     592              : 
     593              :     /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
     594              :     /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
     595              :     /// creates a new root page, increasing the height of the tree.
     596        12308 :     fn flush_node(&mut self) -> Result<()> {
     597        12308 :         // Get the current bottommost node in the stack and flush it to disk.
     598        12308 :         let last = self
     599        12308 :             .stack
     600        12308 :             .pop()
     601        12308 :             .expect("should always have at least one item");
     602        12308 :         let buf = last.pack();
     603        12308 :         let downlink_key = last.first_key();
     604        12308 :         let downlink_ptr = self.writer.write_blk(buf)?;
     605              : 
     606              :         // Append the downlink to the parent. If there is no parent, ie. this was the root page,
     607              :         // create a new root page, increasing the height of the tree.
     608        12308 :         if self.stack.is_empty() {
     609          756 :             self.stack.push(BuildNode::new(last.level + 1));
     610        11552 :         }
     611        12308 :         self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
     612        12308 :     }
     613              : 
     614              :     ///
     615              :     /// Flushes everything to disk, and returns the block number of the root page.
     616              :     /// The caller must store the root block number "out-of-band", and pass it
     617              :     /// to the DiskBtreeReader::new() when you want to read the tree again.
     618              :     /// (In the image and delta layers, it is stored in the beginning of the file,
     619              :     /// in the summary header)
     620              :     ///
     621         1188 :     pub fn finish(mut self) -> Result<(u32, W)> {
     622              :         // flush all levels, except the root.
     623         1944 :         while self.stack.len() > 1 {
     624          756 :             self.flush_node()?;
     625              :         }
     626              : 
     627         1188 :         let root = self
     628         1188 :             .stack
     629         1188 :             .first()
     630         1188 :             .expect("by the check above we left one item there");
     631         1188 :         let buf = root.pack();
     632         1188 :         let root_blknum = self.writer.write_blk(buf)?;
     633              : 
     634         1188 :         Ok((root_blknum, self.writer))
     635         1188 :     }
     636              : 
     637      2021974 :     pub fn borrow_writer(&self) -> &W {
     638      2021974 :         &self.writer
     639      2021974 :     }
     640              : }
     641              : 
     642              : ///
     643              : /// BuildNode represesnts an incomplete page that we are appending to.
     644              : ///
     645              : #[derive(Clone, Debug)]
     646              : struct BuildNode<const L: usize> {
     647              :     num_children: u16,
     648              :     level: u8,
     649              :     prefix: Vec<u8>,
     650              :     suffix_len: usize,
     651              : 
     652              :     keys: Vec<u8>,
     653              :     values: Vec<u8>,
     654              : 
     655              :     size: usize, // physical size of this node, if it was written to disk like this
     656              : }
     657              : 
     658              : const NODE_SIZE: usize = PAGE_SZ;
     659              : 
     660              : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
     661              : 
     662              : impl<const L: usize> BuildNode<L> {
     663        13498 :     fn new(level: u8) -> Self {
     664        13498 :         BuildNode {
     665        13498 :             num_children: 0,
     666        13498 :             level,
     667        13498 :             prefix: Vec::new(),
     668        13498 :             suffix_len: 0,
     669        13498 :             keys: Vec::new(),
     670        13498 :             values: Vec::new(),
     671        13498 :             size: NODE_HDR_SIZE,
     672        13498 :         }
     673        13498 :     }
     674              : 
     675              :     /// Try to append a key-value pair to this node. Returns 'true' on
     676              :     /// success, 'false' if the page was full or the key was
     677              :     /// incompatible with the prefix of the existing keys.
     678      6642266 :     fn push(&mut self, key: &[u8; L], value: Value) -> bool {
     679      6642266 :         // If we have already performed prefix-compression on the page,
     680      6642266 :         // check that the incoming key has the same prefix.
     681      6642266 :         if self.num_children > 0 {
     682              :             // does the prefix allow it?
     683      6628768 :             if !key.starts_with(&self.prefix) {
     684          223 :                 return false;
     685      6628545 :             }
     686        13498 :         } else {
     687        13498 :             self.suffix_len = key.len();
     688        13498 :         }
     689              : 
     690              :         // Is the node too full?
     691      6642043 :         if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
     692        23202 :             return false;
     693      6618841 :         }
     694      6618841 : 
     695      6618841 :         // All clear
     696      6618841 :         self.num_children += 1;
     697      6618841 :         self.keys.extend(&key[self.prefix.len()..]);
     698      6618841 :         self.values.extend(value.0);
     699      6618841 : 
     700      6618841 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     701      6618841 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     702              : 
     703      6618841 :         self.size += self.suffix_len + VALUE_SZ;
     704      6618841 : 
     705      6618841 :         true
     706      6642266 :     }
     707              : 
     708              :     ///
     709              :     /// Perform prefix-compression.
     710              :     ///
     711              :     /// Returns 'true' on success, 'false' if no compression was possible.
     712              :     ///
     713        23412 :     fn compress(&mut self) -> bool {
     714        23412 :         let first_suffix = self.first_suffix();
     715        23412 :         let last_suffix = self.last_suffix();
     716        23412 : 
     717        23412 :         // Find the common prefix among all keys
     718        23412 :         let mut prefix_len = 0;
     719       212885 :         while prefix_len < self.suffix_len {
     720       212885 :             if first_suffix[prefix_len] != last_suffix[prefix_len] {
     721        23412 :                 break;
     722       189473 :             }
     723       189473 :             prefix_len += 1;
     724              :         }
     725        23412 :         if prefix_len == 0 {
     726        11539 :             return false;
     727        11873 :         }
     728        11873 : 
     729        11873 :         // Can compress. Rewrite the keys without the common prefix.
     730        11873 :         self.prefix.extend(&self.keys[..prefix_len]);
     731        11873 : 
     732        11873 :         let mut new_keys = Vec::new();
     733        11873 :         let mut key_off = 0;
     734      3171575 :         while key_off < self.keys.len() {
     735      3159702 :             let next_key_off = key_off + self.suffix_len;
     736      3159702 :             new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
     737      3159702 :             key_off = next_key_off;
     738      3159702 :         }
     739        11873 :         self.keys = new_keys;
     740        11873 :         self.suffix_len -= prefix_len;
     741        11873 : 
     742        11873 :         self.size -= prefix_len * self.num_children as usize;
     743        11873 :         self.size += prefix_len;
     744        11873 : 
     745        11873 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     746        11873 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     747              : 
     748        11873 :         true
     749        23412 :     }
     750              : 
     751              :     ///
     752              :     /// Serialize the node to on-disk format.
     753              :     ///
     754        13496 :     fn pack(&self) -> Bytes {
     755        13496 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     756        13496 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     757        13496 :         assert!(self.num_children > 0);
     758              : 
     759        13496 :         let mut buf = BytesMut::new();
     760        13496 : 
     761        13496 :         buf.put_u16(self.num_children);
     762        13496 :         buf.put_u8(self.level);
     763        13496 :         buf.put_u8(self.prefix.len() as u8);
     764        13496 :         buf.put_u8(self.suffix_len as u8);
     765        13496 :         buf.put(&self.prefix[..]);
     766        13496 :         buf.put(&self.keys[..]);
     767        13496 :         buf.put(&self.values[..]);
     768        13496 : 
     769        13496 :         assert!(buf.len() == self.size);
     770              : 
     771        13496 :         assert!(buf.len() <= PAGE_SZ);
     772        13496 :         buf.resize(PAGE_SZ, 0);
     773        13496 :         buf.freeze()
     774        13496 :     }
     775              : 
     776        35720 :     fn first_suffix(&self) -> &[u8] {
     777        35720 :         &self.keys[..self.suffix_len]
     778        35720 :     }
     779        23412 :     fn last_suffix(&self) -> &[u8] {
     780        23412 :         &self.keys[self.keys.len() - self.suffix_len..]
     781        23412 :     }
     782              : 
     783              :     /// Return the full first key of the page, including the prefix
     784        12308 :     fn first_key(&self) -> [u8; L] {
     785        12308 :         let mut key = [0u8; L];
     786        12308 :         key[..self.prefix.len()].copy_from_slice(&self.prefix);
     787        12308 :         key[self.prefix.len()..].copy_from_slice(self.first_suffix());
     788        12308 :         key
     789        12308 :     }
     790              : }
     791              : 
     792              : #[cfg(test)]
     793              : pub(crate) mod tests {
     794              :     use super::*;
     795              :     use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
     796              :     use rand::Rng;
     797              :     use std::collections::BTreeMap;
     798              :     use std::sync::atomic::{AtomicUsize, Ordering};
     799              : 
     800              :     #[derive(Clone, Default)]
     801              :     pub(crate) struct TestDisk {
     802              :         blocks: Vec<Bytes>,
     803              :     }
     804              :     impl TestDisk {
     805           10 :         fn new() -> Self {
     806           10 :             Self::default()
     807           10 :         }
     808      1016236 :         pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
     809      1016236 :             let mut buf = [0u8; PAGE_SZ];
     810      1016236 :             buf.copy_from_slice(&self.blocks[blknum as usize]);
     811      1016236 :             Ok(std::sync::Arc::new(buf).into())
     812      1016236 :         }
     813              :     }
     814              :     impl BlockReader for TestDisk {
     815       411153 :         fn block_cursor(&self) -> BlockCursor<'_> {
     816       411153 :             BlockCursor::new(BlockReaderRef::TestDisk(self))
     817       411153 :         }
     818              :     }
     819              :     impl BlockWriter for &mut TestDisk {
     820          216 :         fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
     821          216 :             let blknum = self.blocks.len();
     822          216 :             self.blocks.push(buf);
     823          216 :             Ok(blknum as u32)
     824          216 :         }
     825              :     }
     826              : 
     827              :     #[tokio::test]
     828            2 :     async fn basic() -> Result<()> {
     829            2 :         let mut disk = TestDisk::new();
     830            2 :         let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
     831            2 : 
     832            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     833            2 : 
     834            2 :         let all_keys: Vec<&[u8; 6]> = vec![
     835            2 :             b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
     836            2 :         ];
     837            2 :         let all_data: Vec<(&[u8; 6], u64)> = all_keys
     838            2 :             .iter()
     839            2 :             .enumerate()
     840           16 :             .map(|(idx, key)| (*key, idx as u64))
     841            2 :             .collect();
     842           16 :         for (key, val) in all_data.iter() {
     843           16 :             writer.append(key, *val)?;
     844            2 :         }
     845            2 : 
     846            2 :         let (root_offset, _writer) = writer.finish()?;
     847            2 : 
     848            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     849            2 : 
     850            2 :         reader.dump().await?;
     851            2 : 
     852            2 :         // Test the `get` function on all the keys.
     853           16 :         for (key, val) in all_data.iter() {
     854           16 :             assert_eq!(reader.get(key, &ctx).await?, Some(*val));
     855            2 :         }
     856            2 :         // And on some keys that don't exist
     857            2 :         assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
     858            2 :         assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
     859            2 :         assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
     860            2 : 
     861            2 :         // Test search with `visit` function
     862            2 :         let search_key = b"xabaaa";
     863            2 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     864            2 :             .iter()
     865           16 :             .filter(|(key, _value)| key[..] >= search_key[..])
     866           10 :             .map(|(key, value)| (key.to_vec(), *value))
     867            2 :             .collect();
     868            2 : 
     869            2 :         let mut data = Vec::new();
     870            2 :         reader
     871            2 :             .visit(
     872            2 :                 search_key,
     873            2 :                 VisitDirection::Forwards,
     874           10 :                 |key, value| {
     875           10 :                     data.push((key.to_vec(), value));
     876           10 :                     true
     877           10 :                 },
     878            2 :                 &ctx,
     879            2 :             )
     880            2 :             .await?;
     881            2 :         assert_eq!(data, expected);
     882            2 : 
     883            2 :         // Test a backwards scan
     884            2 :         let mut expected: Vec<(Vec<u8>, u64)> = all_data
     885            2 :             .iter()
     886           16 :             .filter(|(key, _value)| key[..] <= search_key[..])
     887            8 :             .map(|(key, value)| (key.to_vec(), *value))
     888            2 :             .collect();
     889            2 :         expected.reverse();
     890            2 :         let mut data = Vec::new();
     891            2 :         reader
     892            2 :             .visit(
     893            2 :                 search_key,
     894            2 :                 VisitDirection::Backwards,
     895            8 :                 |key, value| {
     896            8 :                     data.push((key.to_vec(), value));
     897            8 :                     true
     898            8 :                 },
     899            2 :                 &ctx,
     900            2 :             )
     901            2 :             .await?;
     902            2 :         assert_eq!(data, expected);
     903            2 : 
     904            2 :         // Backward scan where nothing matches
     905            2 :         reader
     906            2 :             .visit(
     907            2 :                 b"aaaaaa",
     908            2 :                 VisitDirection::Backwards,
     909            2 :                 |key, value| {
     910            0 :                     panic!("found unexpected key {}: {}", hex::encode(key), value);
     911            2 :                 },
     912            2 :                 &ctx,
     913            2 :             )
     914            2 :             .await?;
     915            2 : 
     916            2 :         // Full scan
     917            2 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     918            2 :             .iter()
     919           16 :             .map(|(key, value)| (key.to_vec(), *value))
     920            2 :             .collect();
     921            2 :         let mut data = Vec::new();
     922            2 :         reader
     923            2 :             .visit(
     924            2 :                 &[0u8; 6],
     925            2 :                 VisitDirection::Forwards,
     926           16 :                 |key, value| {
     927           16 :                     data.push((key.to_vec(), value));
     928           16 :                     true
     929           16 :                 },
     930            2 :                 &ctx,
     931            2 :             )
     932            2 :             .await?;
     933            2 :         assert_eq!(data, expected);
     934            2 : 
     935            2 :         Ok(())
     936            2 :     }
     937              : 
     938              :     #[tokio::test]
     939            2 :     async fn lots_of_keys() -> Result<()> {
     940            2 :         let mut disk = TestDisk::new();
     941            2 :         let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
     942            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     943            2 : 
     944            2 :         const NUM_KEYS: u64 = 1000;
     945            2 : 
     946            2 :         let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
     947            2 : 
     948         2002 :         for idx in 0..NUM_KEYS {
     949         2000 :             let key_int: u64 = 1 + idx * 2;
     950         2000 :             let key = u64::to_be_bytes(key_int);
     951         2000 :             writer.append(&key, idx)?;
     952            2 : 
     953         2000 :             all_data.insert(key_int, idx);
     954            2 :         }
     955            2 : 
     956            2 :         let (root_offset, _writer) = writer.finish()?;
     957            2 : 
     958            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     959            2 : 
     960            2 :         reader.dump().await?;
     961            2 : 
     962            2 :         use std::sync::Mutex;
     963            2 : 
     964            2 :         let result = Mutex::new(Vec::new());
     965            2 :         let limit: AtomicUsize = AtomicUsize::new(10);
     966        83820 :         let take_ten = |key: &[u8], value: u64| {
     967        83820 :             let mut keybuf = [0u8; 8];
     968        83820 :             keybuf.copy_from_slice(key);
     969        83820 :             let key_int = u64::from_be_bytes(keybuf);
     970        83820 : 
     971        83820 :             let mut result = result.lock().unwrap();
     972        83820 :             result.push((key_int, value));
     973        83820 : 
     974        83820 :             // keep going until we have 10 matches
     975        83820 :             result.len() < limit.load(Ordering::Relaxed)
     976        83820 :         };
     977            2 : 
     978         4020 :         for search_key_int in 0..(NUM_KEYS * 2 + 10) {
     979         4020 :             let search_key = u64::to_be_bytes(search_key_int);
     980         4020 :             assert_eq!(
     981         4020 :                 reader.get(&search_key, &ctx).await?,
     982         4020 :                 all_data.get(&search_key_int).cloned()
     983            2 :             );
     984            2 : 
     985            2 :             // Test a forward scan starting with this key
     986         4020 :             result.lock().unwrap().clear();
     987         4020 :             reader
     988         4020 :                 .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
     989            2 :                 .await?;
     990         4020 :             let expected = all_data
     991         4020 :                 .range(search_key_int..)
     992         4020 :                 .take(10)
     993        39820 :                 .map(|(&key, &val)| (key, val))
     994         4020 :                 .collect::<Vec<(u64, u64)>>();
     995         4020 :             assert_eq!(*result.lock().unwrap(), expected);
     996            2 : 
     997            2 :             // And a backwards scan
     998         4020 :             result.lock().unwrap().clear();
     999         4020 :             reader
    1000         4020 :                 .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1001            2 :                 .await?;
    1002         4020 :             let expected = all_data
    1003         4020 :                 .range(..=search_key_int)
    1004         4020 :                 .rev()
    1005         4020 :                 .take(10)
    1006        40000 :                 .map(|(&key, &val)| (key, val))
    1007         4020 :                 .collect::<Vec<(u64, u64)>>();
    1008         4020 :             assert_eq!(*result.lock().unwrap(), expected);
    1009            2 :         }
    1010            2 : 
    1011            2 :         // full scan
    1012            2 :         let search_key = u64::to_be_bytes(0);
    1013            2 :         limit.store(usize::MAX, Ordering::Relaxed);
    1014            2 :         result.lock().unwrap().clear();
    1015            2 :         reader
    1016            2 :             .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1017            2 :             .await?;
    1018            2 :         let expected = all_data
    1019            2 :             .iter()
    1020         2000 :             .map(|(&key, &val)| (key, val))
    1021            2 :             .collect::<Vec<(u64, u64)>>();
    1022            2 :         assert_eq!(*result.lock().unwrap(), expected);
    1023            2 : 
    1024            2 :         // full scan
    1025            2 :         let search_key = u64::to_be_bytes(u64::MAX);
    1026            2 :         limit.store(usize::MAX, Ordering::Relaxed);
    1027            2 :         result.lock().unwrap().clear();
    1028            2 :         reader
    1029            2 :             .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1030            2 :             .await?;
    1031            2 :         let expected = all_data
    1032            2 :             .iter()
    1033            2 :             .rev()
    1034         2000 :             .map(|(&key, &val)| (key, val))
    1035            2 :             .collect::<Vec<(u64, u64)>>();
    1036            2 :         assert_eq!(*result.lock().unwrap(), expected);
    1037            2 : 
    1038            2 :         Ok(())
    1039            2 :     }
    1040              : 
    1041              :     #[tokio::test]
    1042            2 :     async fn random_data() -> Result<()> {
    1043            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1044            2 : 
    1045            2 :         // Generate random keys with exponential distribution, to
    1046            2 :         // exercise the prefix compression
    1047            2 :         const NUM_KEYS: usize = 100000;
    1048            2 :         let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
    1049       200002 :         for idx in 0..NUM_KEYS {
    1050       200000 :             let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
    1051       200000 :             let t = -(f64::ln(u));
    1052       200000 :             let key_int = (t * 1000000.0) as u128;
    1053       200000 : 
    1054       200000 :             all_data.insert(key_int, idx as u64);
    1055       200000 :         }
    1056            2 : 
    1057            2 :         // Build a tree from it
    1058            2 :         let mut disk = TestDisk::new();
    1059            2 :         let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
    1060            2 : 
    1061       195043 :         for (&key, &val) in all_data.iter() {
    1062       195043 :             writer.append(&u128::to_be_bytes(key), val)?;
    1063            2 :         }
    1064            2 :         let (root_offset, _writer) = writer.finish()?;
    1065            2 : 
    1066            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1067            2 : 
    1068            2 :         // Test get() operation on all the keys
    1069       195043 :         for (&key, &val) in all_data.iter() {
    1070       195043 :             let search_key = u128::to_be_bytes(key);
    1071       195043 :             assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
    1072            2 :         }
    1073            2 : 
    1074            2 :         // Test get() operations on random keys, most of which will not exist
    1075       200002 :         for _ in 0..100000 {
    1076       200000 :             let key_int = rand::thread_rng().gen::<u128>();
    1077       200000 :             let search_key = u128::to_be_bytes(key_int);
    1078       200000 :             assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
    1079            2 :         }
    1080            2 : 
    1081            2 :         // Test boundary cases
    1082            2 :         assert!(
    1083            2 :             reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
    1084            2 :                 == all_data.get(&u128::MIN).cloned()
    1085            2 :         );
    1086            2 :         assert!(
    1087            2 :             reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
    1088            2 :                 == all_data.get(&u128::MAX).cloned()
    1089            2 :         );
    1090            2 : 
    1091            2 :         Ok(())
    1092            2 :     }
    1093              : 
    1094              :     #[test]
    1095            2 :     fn unsorted_input() {
    1096            2 :         let mut disk = TestDisk::new();
    1097            2 :         let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
    1098            2 : 
    1099            2 :         let _ = writer.append(b"ba", 1);
    1100            2 :         let _ = writer.append(b"bb", 2);
    1101            2 :         let err = writer.append(b"aa", 3).expect_err("should've failed");
    1102            2 :         match err {
    1103            2 :             DiskBtreeError::UnsortedInput { key, last_key } => {
    1104            2 :                 assert_eq!(key.as_ref(), b"aa".as_slice());
    1105            2 :                 assert_eq!(last_key.as_ref(), b"bb".as_slice());
    1106              :             }
    1107            0 :             _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
    1108              :         }
    1109            2 :     }
    1110              : 
    1111              :     ///
    1112              :     /// This test contains a particular data set, see disk_btree_test_data.rs
    1113              :     ///
    1114              :     #[tokio::test]
    1115            2 :     async fn particular_data() -> Result<()> {
    1116            2 :         // Build a tree from it
    1117            2 :         let mut disk = TestDisk::new();
    1118            2 :         let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
    1119            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1120            2 : 
    1121         4002 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1122         4000 :             writer.append(&key, val)?;
    1123            2 :         }
    1124            2 :         let (root_offset, writer) = writer.finish()?;
    1125            2 : 
    1126            2 :         println!("SIZE: {} blocks", writer.blocks.len());
    1127            2 : 
    1128            2 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1129            2 : 
    1130            2 :         // Test get() operation on all the keys
    1131         4002 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1132         4000 :             assert_eq!(reader.get(&key, &ctx).await?, Some(val));
    1133            2 :         }
    1134            2 : 
    1135            2 :         // Test full scan
    1136            2 :         let mut count = 0;
    1137            2 :         reader
    1138            2 :             .visit(
    1139            2 :                 &[0u8; 26],
    1140            2 :                 VisitDirection::Forwards,
    1141         4000 :                 |_key, _value| {
    1142         4000 :                     count += 1;
    1143         4000 :                     true
    1144         4000 :                 },
    1145            2 :                 &ctx,
    1146            2 :             )
    1147            2 :             .await?;
    1148            2 :         assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
    1149            2 : 
    1150            2 :         reader.dump().await?;
    1151            2 : 
    1152            2 :         Ok(())
    1153            2 :     }
    1154              : }
    1155              : 
    1156              : #[cfg(test)]
    1157              : #[path = "disk_btree_test_data.rs"]
    1158              : mod disk_btree_test_data;
        

Generated by: LCOV version 2.1-beta