LCOV - code coverage report
Current view: top level - pageserver/src/tenant - disk_btree.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 99.2 % 847 840
Test Date: 2025-04-24 20:31:15 Functions: 86.1 % 201 173

            Line data    Source code
       1              : //!
       2              : //! Simple on-disk B-tree implementation
       3              : //!
       4              : //! This is used as the index structure within image and delta layers
       5              : //!
       6              : //! Features:
       7              : //! - Fixed-width keys
       8              : //! - Fixed-width values (VALUE_SZ)
       9              : //! - The tree is created in a bulk operation. Insert/deletion after creation
      10              : //!   is not supported
      11              : //! - page-oriented
      12              : //!
      13              : //! TODO:
      14              : //! - maybe something like an Adaptive Radix Tree would be more efficient?
      15              : //! - the values stored by image and delta layers are offsets into the file,
      16              : //!   and they are in monotonically increasing order. Prefix compression would
      17              : //!   be very useful for them, too.
      18              : //! - An Iterator interface would be more convenient for the callers than the
      19              : //!   'visit' function
      20              : //!
      21              : use std::cmp::Ordering;
      22              : use std::iter::Rev;
      23              : use std::ops::{Range, RangeInclusive};
      24              : use std::{io, result};
      25              : 
      26              : use async_stream::try_stream;
      27              : use byteorder::{BE, ReadBytesExt};
      28              : use bytes::BufMut;
      29              : use either::Either;
      30              : use futures::{Stream, StreamExt};
      31              : use hex;
      32              : use thiserror::Error;
      33              : use tracing::error;
      34              : 
      35              : use crate::context::RequestContext;
      36              : use crate::tenant::block_io::{BlockReader, BlockWriter};
      37              : use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer};
      38              : 
      39              : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
      40              : pub const VALUE_SZ: usize = 5;
      41              : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
      42              : 
      43              : pub const PAGE_SZ: usize = 8192;
      44              : 
      45              : #[derive(Clone, Copy, Debug)]
      46              : struct Value([u8; VALUE_SZ]);
      47              : 
      48              : impl Value {
      49     54684992 :     fn from_slice(slice: &[u8]) -> Value {
      50     54684992 :         let mut b = [0u8; VALUE_SZ];
      51     54684992 :         b.copy_from_slice(slice);
      52     54684992 :         Value(b)
      53     54684992 :     }
      54              : 
      55     40491813 :     fn from_u64(x: u64) -> Value {
      56     40491813 :         assert!(x <= 0x007f_ffff_ffff);
      57     40491813 :         Value([
      58     40491813 :             (x >> 32) as u8,
      59     40491813 :             (x >> 24) as u8,
      60     40491813 :             (x >> 16) as u8,
      61     40491813 :             (x >> 8) as u8,
      62     40491813 :             x as u8,
      63     40491813 :         ])
      64     40491813 :     }
      65              : 
      66        74936 :     fn from_blknum(x: u32) -> Value {
      67        74936 :         Value([
      68        74936 :             0x80,
      69        74936 :             (x >> 24) as u8,
      70        74936 :             (x >> 16) as u8,
      71        74936 :             (x >> 8) as u8,
      72        74936 :             x as u8,
      73        74936 :         ])
      74        74936 :     }
      75              : 
      76              :     #[allow(dead_code)]
      77            0 :     fn is_offset(self) -> bool {
      78            0 :         self.0[0] & 0x80 != 0
      79            0 :     }
      80              : 
      81     50731893 :     fn to_u64(self) -> u64 {
      82     50731893 :         let b = &self.0;
      83     50731893 :         ((b[0] as u64) << 32)
      84     50731893 :             | ((b[1] as u64) << 24)
      85     50731893 :             | ((b[2] as u64) << 16)
      86     50731893 :             | ((b[3] as u64) << 8)
      87     50731893 :             | b[4] as u64
      88     50731893 :     }
      89              : 
      90      3916955 :     fn to_blknum(self) -> u32 {
      91      3916955 :         let b = &self.0;
      92      3916955 :         assert!(b[0] == 0x80);
      93      3916955 :         ((b[1] as u32) << 24) | ((b[2] as u32) << 16) | ((b[3] as u32) << 8) | b[4] as u32
      94      3916955 :     }
      95              : }
      96              : 
      97              : #[derive(Error, Debug)]
      98              : pub enum DiskBtreeError {
      99              :     #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
     100              :     AppendOverflow(u64),
     101              : 
     102              :     #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
     103              :     UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
     104              : 
     105              :     #[error("Could not push to new leaf node")]
     106              :     FailedToPushToNewLeafNode,
     107              : 
     108              :     #[error("IoError: {0}")]
     109              :     Io(#[from] io::Error),
     110              : }
     111              : 
     112              : pub type Result<T> = result::Result<T, DiskBtreeError>;
     113              : 
     114              : /// This is the on-disk representation.
     115              : struct OnDiskNode<'a, const L: usize> {
     116              :     // Fixed-width fields
     117              :     num_children: u16,
     118              :     level: u8,
     119              :     prefix_len: u8,
     120              :     suffix_len: u8,
     121              : 
     122              :     // Variable-length fields. These are stored on-disk after the fixed-width
     123              :     // fields, in this order. In the in-memory representation, these point to
     124              :     // the right parts in the page buffer.
     125              :     prefix: &'a [u8],
     126              :     keys: &'a [u8],
     127              :     values: &'a [u8],
     128              : }
     129              : 
     130              : impl<const L: usize> OnDiskNode<'_, L> {
     131              :     ///
     132              :     /// Interpret a PAGE_SZ page as a node.
     133              :     ///
     134      9476504 :     fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
     135      9476504 :         let mut cursor = std::io::Cursor::new(buf);
     136      9476504 :         let num_children = cursor.read_u16::<BE>()?;
     137      9476504 :         let level = cursor.read_u8()?;
     138      9476504 :         let prefix_len = cursor.read_u8()?;
     139      9476504 :         let suffix_len = cursor.read_u8()?;
     140              : 
     141      9476504 :         let mut off = cursor.position();
     142      9476504 :         let prefix_off = off as usize;
     143      9476504 :         off += prefix_len as u64;
     144      9476504 : 
     145      9476504 :         let keys_off = off as usize;
     146      9476504 :         let keys_len = num_children as usize * suffix_len as usize;
     147      9476504 :         off += keys_len as u64;
     148      9476504 : 
     149      9476504 :         let values_off = off as usize;
     150      9476504 :         let values_len = num_children as usize * VALUE_SZ;
     151      9476504 :         //off += values_len as u64;
     152      9476504 : 
     153      9476504 :         let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
     154      9476504 :         let keys = &buf[keys_off..keys_off + keys_len];
     155      9476504 :         let values = &buf[values_off..values_off + values_len];
     156      9476504 : 
     157      9476504 :         Ok(OnDiskNode {
     158      9476504 :             num_children,
     159      9476504 :             level,
     160      9476504 :             prefix_len,
     161      9476504 :             suffix_len,
     162      9476504 :             prefix,
     163      9476504 :             keys,
     164      9476504 :             values,
     165      9476504 :         })
     166      9476504 :     }
     167              : 
     168              :     ///
     169              :     /// Read a value at 'idx'
     170              :     ///
     171     54684992 :     fn value(&self, idx: usize) -> Value {
     172     54684992 :         let value_off = idx * VALUE_SZ;
     173     54684992 :         let value_slice = &self.values[value_off..value_off + VALUE_SZ];
     174     54684992 :         Value::from_slice(value_slice)
     175     54684992 :     }
     176              : 
     177      8109891 :     fn binary_search(
     178      8109891 :         &self,
     179      8109891 :         search_key: &[u8; L],
     180      8109891 :         keybuf: &mut [u8],
     181      8109891 :     ) -> result::Result<usize, usize> {
     182      8109891 :         let mut size = self.num_children as usize;
     183      8109891 :         let mut low = 0;
     184      8109891 :         let mut high = size;
     185     61207286 :         while low < high {
     186     54747149 :             let mid = low + size / 2;
     187     54747149 : 
     188     54747149 :             let key_off = mid * self.suffix_len as usize;
     189     54747149 :             let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
     190     54747149 :             // Does this match?
     191     54747149 :             keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
     192     54747149 : 
     193     54747149 :             let cmp = keybuf[..].cmp(search_key);
     194     54747149 : 
     195     54747149 :             if cmp == Ordering::Less {
     196     34336288 :                 low = mid + 1;
     197     34336288 :             } else if cmp == Ordering::Greater {
     198     18761107 :                 high = mid;
     199     18761107 :             } else {
     200      1649754 :                 return Ok(mid);
     201              :             }
     202     53097395 :             size = high - low;
     203              :         }
     204      6460137 :         Err(low)
     205      8109891 :     }
     206              : }
     207              : 
     208              : ///
     209              : /// Public reader object, to search the tree.
     210              : ///
     211              : #[derive(Clone)]
     212              : pub struct DiskBtreeReader<R, const L: usize>
     213              : where
     214              :     R: BlockReader,
     215              : {
     216              :     start_blk: u32,
     217              :     root_blk: u32,
     218              :     reader: R,
     219              : }
     220              : 
     221              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     222              : pub enum VisitDirection {
     223              :     Forwards,
     224              :     Backwards,
     225              : }
     226              : 
     227              : impl<R, const L: usize> DiskBtreeReader<R, L>
     228              : where
     229              :     R: BlockReader,
     230              : {
     231      1607647 :     pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
     232      1607647 :         DiskBtreeReader {
     233      1607647 :             start_blk,
     234      1607647 :             root_blk,
     235      1607647 :             reader,
     236      1607647 :         }
     237      1607647 :     }
     238              : 
     239              :     ///
     240              :     /// Read the value for given key. Returns the value, or None if it doesn't exist.
     241              :     ///
     242      2418765 :     pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
     243      2418765 :         let mut result: Option<u64> = None;
     244      2418765 :         self.visit(
     245      2418765 :             search_key,
     246      2418765 :             VisitDirection::Forwards,
     247      2418765 :             |key, value| {
     248      1218621 :                 if key == search_key {
     249      1206588 :                     result = Some(value);
     250      1206588 :                 }
     251      1218621 :                 false
     252      2418765 :             },
     253      2418765 :             ctx,
     254      2418765 :         )
     255      2418765 :         .await?;
     256      2418765 :         Ok(result)
     257      2418765 :     }
     258              : 
     259         4224 :     pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
     260         4224 :     where
     261         4224 :         R: 'a + Send,
     262         4224 :     {
     263         4224 :         DiskBtreeIterator {
     264         4224 :             stream: Box::pin(self.into_stream(start_key, ctx)),
     265         4224 :         }
     266         4224 :     }
     267              : 
     268              :     /// Return a stream which yields all key, value pairs from the index
     269              :     /// starting from the first key greater or equal to `start_key`.
     270              :     ///
     271              :     /// Note 1: that this is a copy of [`Self::visit`].
     272              :     /// TODO: Once the sequential read path is removed this will become
     273              :     /// the only index traversal method.
     274              :     ///
     275              :     /// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
     276              :     /// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
     277              :     /// requires the reader to be present when the stream is used, and this creates a lifetime
     278              :     /// dependency between the reader and the stream. Now if we want to create an iterator that
     279              :     /// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
     280              :     /// to use from the image/delta layer APIs.
     281              :     ///
     282              :     /// Feel free to add the `&self` variant back if it's necessary.
     283      1723459 :     pub fn into_stream<'a>(
     284      1723459 :         self,
     285      1723459 :         start_key: &'a [u8; L],
     286      1723459 :         ctx: &'a RequestContext,
     287      1723459 :     ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
     288      1723459 :     where
     289      1723459 :         R: 'a,
     290      1723459 :     {
     291      1723459 :         try_stream! {
     292      1723459 :             let mut stack = Vec::new();
     293      1723459 :             stack.push((self.root_blk, None));
     294      1723459 :             let block_cursor = self.reader.block_cursor();
     295      1723459 :             let mut node_buf = [0_u8; PAGE_SZ];
     296      1723459 :             while let Some((node_blknum, opt_iter)) = stack.pop() {
     297      1723459 :                 // Read the node, through the PS PageCache, into local variable `node_buf`.
     298      1723459 :                 // We could keep the page cache read guard alive, but, at the time of writing,
     299      1723459 :                 // we run quite small PS PageCache s => can't risk running out of
     300      1723459 :                 // PageCache space because this stream isn't consumed fast enough.
     301      1723459 :                 let page_read_guard = block_cursor
     302      1723459 :                     .read_blk(self.start_blk + node_blknum, ctx)
     303      1723459 :                     .await?;
     304      1723459 :                 node_buf.copy_from_slice(page_read_guard.as_ref());
     305      1723459 :                 drop(page_read_guard); // drop page cache read guard early
     306      1723459 : 
     307      1723459 :                 let node = OnDiskNode::deparse(&node_buf)?;
     308      1723459 :                 let prefix_len = node.prefix_len as usize;
     309      1723459 :                 let suffix_len = node.suffix_len as usize;
     310      1723459 : 
     311      1723459 :                 assert!(node.num_children > 0);
     312      1723459 : 
     313      1723459 :                 let mut keybuf = Vec::new();
     314      1723459 :                 keybuf.extend(node.prefix);
     315      1723459 :                 keybuf.resize(prefix_len + suffix_len, 0);
     316      1723459 : 
     317      1723459 :                 let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
     318      1723459 :                     iter
     319      1723459 :                 } else {
     320      1723459 :                     // Locate the first match
     321      1723459 :                     let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
     322      1723459 :                         Ok(idx) => idx,
     323      1723459 :                         Err(idx) => {
     324      1723459 :                             if node.level == 0 {
     325      1723459 :                                 // Imagine that the node contains the following keys:
     326      1723459 :                                 //
     327      1723459 :                                 // 1
     328      1723459 :                                 // 3  <-- idx
     329      1723459 :                                 // 5
     330      1723459 :                                 //
     331      1723459 :                                 // If the search key is '2' and there is exact match,
     332      1723459 :                                 // the binary search would return the index of key
     333      1723459 :                                 // '3'. That's cool, '3' is the first key to return.
     334      1723459 :                                 idx
     335      1723459 :                             } else {
     336      1723459 :                                 // This is an internal page, so each key represents a lower
     337      1723459 :                                 // bound for what's in the child page. If there is no exact
     338      1723459 :                                 // match, we have to return the *previous* entry.
     339      1723459 :                                 //
     340      1723459 :                                 // 1  <-- return this
     341      1723459 :                                 // 3  <-- idx
     342      1723459 :                                 // 5
     343      1723459 :                                 idx.saturating_sub(1)
     344      1723459 :                             }
     345      1723459 :                         }
     346      1723459 :                     };
     347      1723459 :                     Either::Left(idx..node.num_children.into())
     348      1723459 :                 };
     349      1723459 : 
     350      1723459 : 
     351      1723459 :                 // idx points to the first match now. Keep going from there
     352      1723459 :                 while let Some(idx) = iter.next() {
     353      1723459 :                     let key_off = idx * suffix_len;
     354      1723459 :                     let suffix = &node.keys[key_off..key_off + suffix_len];
     355      1723459 :                     keybuf[prefix_len..].copy_from_slice(suffix);
     356      1723459 :                     let value = node.value(idx);
     357      1723459 :                     #[allow(clippy::collapsible_if)]
     358      1723459 :                     if node.level == 0 {
     359      1723459 :                         // leaf
     360      1723459 :                         yield (keybuf.clone(), value.to_u64());
     361      1723459 :                     } else {
     362      1723459 :                         stack.push((node_blknum, Some(iter)));
     363      1723459 :                         stack.push((value.to_blknum(), None));
     364      1723459 :                         break;
     365      1723459 :                     }
     366      1723459 :                 }
     367      1723459 :             }
     368      1723459 :         }
     369      1723459 :     }
     370              : 
     371              :     ///
     372              :     /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
     373              :     /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
     374              :     /// backwards)
     375              :     ///
     376      2469525 :     pub async fn visit<V>(
     377      2469525 :         &self,
     378      2469525 :         search_key: &[u8; L],
     379      2469525 :         dir: VisitDirection,
     380      2469525 :         mut visitor: V,
     381      2469525 :         ctx: &RequestContext,
     382      2469525 :     ) -> Result<bool>
     383      2469525 :     where
     384      2469525 :         V: FnMut(&[u8], u64) -> bool,
     385      2469525 :     {
     386      2469525 :         let mut stack = Vec::new();
     387      2469525 :         stack.push((self.root_blk, None));
     388      2469525 :         let block_cursor = self.reader.block_cursor();
     389      7314006 :         while let Some((node_blknum, opt_iter)) = stack.pop() {
     390              :             // Locate the node.
     391      6110802 :             let node_buf = block_cursor
     392      6110802 :                 .read_blk(self.start_blk + node_blknum, ctx)
     393      6110802 :                 .await?;
     394              : 
     395      6110802 :             let node = OnDiskNode::deparse(node_buf.as_ref())?;
     396      6110802 :             let prefix_len = node.prefix_len as usize;
     397      6110802 :             let suffix_len = node.suffix_len as usize;
     398      6110802 : 
     399      6110802 :             assert!(node.num_children > 0);
     400              : 
     401      6110802 :             let mut keybuf = Vec::new();
     402      6110802 :             keybuf.extend(node.prefix);
     403      6110802 :             keybuf.resize(prefix_len + suffix_len, 0);
     404              : 
     405      6110802 :             let mut iter = if let Some(iter) = opt_iter {
     406      1223388 :                 iter
     407      4887414 :             } else if dir == VisitDirection::Forwards {
     408              :                 // Locate the first match
     409      4863258 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     410      1219836 :                     Ok(idx) => idx,
     411      3643422 :                     Err(idx) => {
     412      3643422 :                         if node.level == 0 {
     413              :                             // Imagine that the node contains the following keys:
     414              :                             //
     415              :                             // 1
     416              :                             // 3  <-- idx
     417              :                             // 5
     418              :                             //
     419              :                             // If the search key is '2' and there is exact match,
     420              :                             // the binary search would return the index of key
     421              :                             // '3'. That's cool, '3' is the first key to return.
     422      1248693 :                             idx
     423              :                         } else {
     424              :                             // This is an internal page, so each key represents a lower
     425              :                             // bound for what's in the child page. If there is no exact
     426              :                             // match, we have to return the *previous* entry.
     427              :                             //
     428              :                             // 1  <-- return this
     429              :                             // 3  <-- idx
     430              :                             // 5
     431      2394729 :                             idx.saturating_sub(1)
     432              :                         }
     433              :                     }
     434              :                 };
     435      4863258 :                 Either::Left(idx..node.num_children.into())
     436              :             } else {
     437        24156 :                 let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
     438        12012 :                     Ok(idx) => {
     439        12012 :                         // Exact match. That's the first entry to return, and walk
     440        12012 :                         // backwards from there.
     441        12012 :                         idx
     442              :                     }
     443        12144 :                     Err(idx) => {
     444              :                         // No exact match. The binary search returned the index of the
     445              :                         // first key that's > search_key. Back off by one, and walk
     446              :                         // backwards from there.
     447        12144 :                         if let Some(idx) = idx.checked_sub(1) {
     448        12120 :                             idx
     449              :                         } else {
     450           24 :                             return Ok(false);
     451              :                         }
     452              :                     }
     453              :                 };
     454        24132 :                 Either::Right((0..=idx).rev())
     455              :             };
     456              : 
     457              :             // idx points to the first match now. Keep going from there
     458     18974502 :             while let Some(idx) = iter.next() {
     459     16547910 :                 let key_off = idx * suffix_len;
     460     16547910 :                 let suffix = &node.keys[key_off..key_off + suffix_len];
     461     16547910 :                 keybuf[prefix_len..].copy_from_slice(suffix);
     462     16547910 :                 let value = node.value(idx);
     463     16547910 :                 #[allow(clippy::collapsible_if)]
     464     16547910 :                 if node.level == 0 {
     465              :                     // leaf
     466     14130021 :                     if !visitor(&keybuf, value.to_u64()) {
     467      1266297 :                         return Ok(false);
     468     12863724 :                     }
     469              :                 } else {
     470      2417889 :                     stack.push((node_blknum, Some(iter)));
     471      2417889 :                     stack.push((value.to_blknum(), None));
     472      2417889 :                     break;
     473              :                 }
     474              :             }
     475              :         }
     476      1203204 :         Ok(true)
     477      2469525 :     }
     478              : 
     479              :     #[allow(dead_code)]
     480           60 :     pub async fn dump(&self, ctx: &RequestContext) -> Result<()> {
     481           60 :         let mut stack = Vec::new();
     482           60 : 
     483           60 :         stack.push((self.root_blk, String::new(), 0, 0, 0));
     484           60 : 
     485           60 :         let block_cursor = self.reader.block_cursor();
     486              : 
     487        36252 :         while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
     488        36192 :             let blk = block_cursor.read_blk(self.start_blk + blknum, ctx).await?;
     489        36192 :             let buf: &[u8] = blk.as_ref();
     490        36192 :             let node = OnDiskNode::<L>::deparse(buf)?;
     491              : 
     492        36192 :             if child_idx == 0 {
     493          108 :                 print!("{:indent$}", "", indent = depth * 2);
     494          108 :                 let path_prefix = stack
     495          108 :                     .iter()
     496          108 :                     .map(|(_blknum, path, ..)| path.as_str())
     497          108 :                     .collect::<String>();
     498          108 :                 println!(
     499          108 :                     "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
     500          108 :                     hex::encode(node.prefix),
     501          108 :                     node.suffix_len
     502          108 :                 );
     503        36084 :             }
     504              : 
     505        36192 :             if child_idx + 1 < node.num_children {
     506        36084 :                 let key_off = key_off + node.suffix_len as usize;
     507        36084 :                 stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
     508        36084 :             }
     509        36192 :             let key = &node.keys[key_off..key_off + node.suffix_len as usize];
     510        36192 :             let val = node.value(child_idx as usize);
     511        36192 : 
     512        36192 :             print!("{:indent$}", "", indent = depth * 2 + 2);
     513        36192 :             println!("{}: {}", hex::encode(key), hex::encode(val.0));
     514        36192 : 
     515        36192 :             if node.level > 0 {
     516           48 :                 stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
     517        36144 :             }
     518              :         }
     519           60 :         Ok(())
     520           60 :     }
     521              : }
     522              : 
     523              : pub struct DiskBtreeIterator<'a> {
     524              :     #[allow(clippy::type_complexity)]
     525              :     stream: std::pin::Pin<
     526              :         Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
     527              :     >,
     528              : }
     529              : 
     530              : impl DiskBtreeIterator<'_> {
     531     13942725 :     pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
     532     13942725 :         self.stream.next().await
     533     13942725 :     }
     534              : }
     535              : 
     536              : ///
     537              : /// Public builder object, for creating a new tree.
     538              : ///
     539              : /// Usage: Create a builder object by calling 'new', load all the data into the
     540              : /// tree by calling 'append' for each key-value pair, and then call 'finish'
     541              : ///
     542              : /// 'L' is the key length in bytes
     543              : pub struct DiskBtreeBuilder<W, const L: usize>
     544              : where
     545              :     W: BlockWriter,
     546              : {
     547              :     writer: W,
     548              : 
     549              :     ///
     550              :     /// `stack[0]` is the current root page, `stack.last()` is the leaf.
     551              :     ///
     552              :     /// We maintain the length of the stack to be always greater than zero.
     553              :     /// Two exceptions are:
     554              :     /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
     555              :     ///    So because other methods cannot see the intermediate state invariant still holds.
     556              :     /// 2. `Self::finish`. It consumes self and does not return it back,
     557              :     ///    which means that this is where the structure is destroyed.
     558              :     ///    Thus stack of zero length cannot be observed by other methods.
     559              :     stack: Vec<BuildNode<L>>,
     560              : 
     561              :     /// Last key that was appended to the tree. Used to sanity check that append
     562              :     /// is called in increasing key order.
     563              :     last_key: Option<[u8; L]>,
     564              : }
     565              : 
     566              : impl<W, const L: usize> DiskBtreeBuilder<W, L>
     567              : where
     568              :     W: BlockWriter,
     569              : {
     570        12672 :     pub fn new(writer: W) -> Self {
     571        12672 :         DiskBtreeBuilder {
     572        12672 :             writer,
     573        12672 :             last_key: None,
     574        12672 :             stack: vec![BuildNode::new(0)],
     575        12672 :         }
     576        12672 :     }
     577              : 
     578     40491825 :     pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
     579     40491825 :         if value > MAX_VALUE {
     580            0 :             return Err(DiskBtreeError::AppendOverflow(value));
     581     40491825 :         }
     582     40491825 :         if let Some(last_key) = &self.last_key {
     583     40480485 :             if key <= last_key {
     584           12 :                 return Err(DiskBtreeError::UnsortedInput {
     585           12 :                     key: key.as_slice().into(),
     586           12 :                     last_key: last_key.as_slice().into(),
     587           12 :                 });
     588     40480473 :             }
     589        11340 :         }
     590     40491813 :         self.last_key = Some(*key);
     591     40491813 : 
     592     40491813 :         self.append_internal(key, Value::from_u64(value))
     593     40491825 :     }
     594              : 
     595     40566749 :     fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
     596     40566749 :         // Try to append to the current leaf buffer
     597     40566749 :         let last = self
     598     40566749 :             .stack
     599     40566749 :             .last_mut()
     600     40566749 :             .expect("should always have at least one item");
     601     40566749 :         let level = last.level;
     602     40566749 :         if last.push(key, value) {
     603     40424365 :             return Ok(());
     604       142384 :         }
     605       142384 : 
     606       142384 :         // It did not fit. Try to compress, and if it succeeds to make
     607       142384 :         // some room on the node, try appending to it again.
     608       142384 :         #[allow(clippy::collapsible_if)]
     609       142384 :         if last.compress() {
     610        72320 :             if last.push(key, value) {
     611        72272 :                 return Ok(());
     612           48 :             }
     613        70064 :         }
     614              : 
     615              :         // Could not append to the current leaf. Flush it and create a new one.
     616        70112 :         self.flush_node()?;
     617              : 
     618              :         // Replace the node we flushed with an empty one and append the new
     619              :         // key to it.
     620        70112 :         let mut last = BuildNode::new(level);
     621        70112 :         if !last.push(key, value) {
     622            0 :             return Err(DiskBtreeError::FailedToPushToNewLeafNode);
     623        70112 :         }
     624        70112 : 
     625        70112 :         self.stack.push(last);
     626        70112 : 
     627        70112 :         Ok(())
     628     40566749 :     }
     629              : 
     630              :     /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
     631              :     /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
     632              :     /// creates a new root page, increasing the height of the tree.
     633        74936 :     fn flush_node(&mut self) -> Result<()> {
     634        74936 :         // Get the current bottommost node in the stack and flush it to disk.
     635        74936 :         let last = self
     636        74936 :             .stack
     637        74936 :             .pop()
     638        74936 :             .expect("should always have at least one item");
     639        74936 :         let buf = last.pack();
     640        74936 :         let downlink_key = last.first_key();
     641        74936 :         let downlink_ptr = self.writer.write_blk(buf)?;
     642              : 
     643              :         // Append the downlink to the parent. If there is no parent, ie. this was the root page,
     644              :         // create a new root page, increasing the height of the tree.
     645        74936 :         if self.stack.is_empty() {
     646         4824 :             self.stack.push(BuildNode::new(last.level + 1));
     647        70112 :         }
     648        74936 :         self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
     649        74936 :     }
     650              : 
     651              :     ///
     652              :     /// Flushes everything to disk, and returns the block number of the root page.
     653              :     /// The caller must store the root block number "out-of-band", and pass it
     654              :     /// to the DiskBtreeReader::new() when you want to read the tree again.
     655              :     /// (In the image and delta layers, it is stored in the beginning of the file,
     656              :     /// in the summary header)
     657              :     ///
     658        11040 :     pub fn finish(mut self) -> Result<(u32, W)> {
     659              :         // flush all levels, except the root.
     660        15864 :         while self.stack.len() > 1 {
     661         4824 :             self.flush_node()?;
     662              :         }
     663              : 
     664        11040 :         let root = self
     665        11040 :             .stack
     666        11040 :             .first()
     667        11040 :             .expect("by the check above we left one item there");
     668        11040 :         let buf = root.pack();
     669        11040 :         let root_blknum = self.writer.write_blk(buf)?;
     670              : 
     671        11040 :         Ok((root_blknum, self.writer))
     672        11040 :     }
     673              : 
     674     12285996 :     pub fn borrow_writer(&self) -> &W {
     675     12285996 :         &self.writer
     676     12285996 :     }
     677              : }
     678              : 
     679              : ///
     680              : /// BuildNode represesnts an incomplete page that we are appending to.
     681              : ///
     682              : #[derive(Clone, Debug)]
     683              : struct BuildNode<const L: usize> {
     684              :     num_children: u16,
     685              :     level: u8,
     686              :     prefix: Vec<u8>,
     687              :     suffix_len: usize,
     688              : 
     689              :     keys: Vec<u8>,
     690              :     values: Vec<u8>,
     691              : 
     692              :     size: usize, // physical size of this node, if it was written to disk like this
     693              : }
     694              : 
     695              : const NODE_SIZE: usize = PAGE_SZ;
     696              : 
     697              : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
     698              : 
     699              : impl<const L: usize> BuildNode<L> {
     700        87608 :     fn new(level: u8) -> Self {
     701        87608 :         BuildNode {
     702        87608 :             num_children: 0,
     703        87608 :             level,
     704        87608 :             prefix: Vec::new(),
     705        87608 :             suffix_len: 0,
     706        87608 :             keys: Vec::new(),
     707        87608 :             values: Vec::new(),
     708        87608 :             size: NODE_HDR_SIZE,
     709        87608 :         }
     710        87608 :     }
     711              : 
     712              :     /// Try to append a key-value pair to this node. Returns 'true' on
     713              :     /// success, 'false' if the page was full or the key was
     714              :     /// incompatible with the prefix of the existing keys.
     715     40709181 :     fn push(&mut self, key: &[u8; L], value: Value) -> bool {
     716     40709181 :         // If we have already performed prefix-compression on the page,
     717     40709181 :         // check that the incoming key has the same prefix.
     718     40709181 :         if self.num_children > 0 {
     719              :             // does the prefix allow it?
     720     40622905 :             if !key.starts_with(&self.prefix) {
     721         1274 :                 return false;
     722     40621631 :             }
     723        86276 :         } else {
     724        86276 :             self.suffix_len = key.len();
     725        86276 :         }
     726              : 
     727              :         // Is the node too full?
     728     40707907 :         if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
     729       141158 :             return false;
     730     40566749 :         }
     731     40566749 : 
     732     40566749 :         // All clear
     733     40566749 :         self.num_children += 1;
     734     40566749 :         self.keys.extend(&key[self.prefix.len()..]);
     735     40566749 :         self.values.extend(value.0);
     736     40566749 : 
     737     40566749 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     738     40566749 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     739              : 
     740     40566749 :         self.size += self.suffix_len + VALUE_SZ;
     741     40566749 : 
     742     40566749 :         true
     743     40709181 :     }
     744              : 
     745              :     ///
     746              :     /// Perform prefix-compression.
     747              :     ///
     748              :     /// Returns 'true' on success, 'false' if no compression was possible.
     749              :     ///
     750       142384 :     fn compress(&mut self) -> bool {
     751       142384 :         let first_suffix = self.first_suffix();
     752       142384 :         let last_suffix = self.last_suffix();
     753       142384 : 
     754       142384 :         // Find the common prefix among all keys
     755       142384 :         let mut prefix_len = 0;
     756      1295934 :         while prefix_len < self.suffix_len {
     757      1295934 :             if first_suffix[prefix_len] != last_suffix[prefix_len] {
     758       142384 :                 break;
     759      1153550 :             }
     760      1153550 :             prefix_len += 1;
     761              :         }
     762       142384 :         if prefix_len == 0 {
     763        70064 :             return false;
     764        72320 :         }
     765        72320 : 
     766        72320 :         // Can compress. Rewrite the keys without the common prefix.
     767        72320 :         self.prefix.extend(&self.keys[..prefix_len]);
     768        72320 : 
     769        72320 :         let mut new_keys = Vec::new();
     770        72320 :         let mut key_off = 0;
     771     19348598 :         while key_off < self.keys.len() {
     772     19276278 :             let next_key_off = key_off + self.suffix_len;
     773     19276278 :             new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
     774     19276278 :             key_off = next_key_off;
     775     19276278 :         }
     776        72320 :         self.keys = new_keys;
     777        72320 :         self.suffix_len -= prefix_len;
     778        72320 : 
     779        72320 :         self.size -= prefix_len * self.num_children as usize;
     780        72320 :         self.size += prefix_len;
     781        72320 : 
     782        72320 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     783        72320 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     784              : 
     785        72320 :         true
     786       142384 :     }
     787              : 
     788              :     ///
     789              :     /// Serialize the node to on-disk format.
     790              :     ///
     791        85976 :     fn pack(&self) -> IoBuffer {
     792        85976 :         assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
     793        85976 :         assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
     794        85976 :         assert!(self.num_children > 0);
     795              : 
     796        85976 :         let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
     797        85976 : 
     798        85976 :         buf.put_u16(self.num_children);
     799        85976 :         buf.put_u8(self.level);
     800        85976 :         buf.put_u8(self.prefix.len() as u8);
     801        85976 :         buf.put_u8(self.suffix_len as u8);
     802        85976 :         buf.put(&self.prefix[..]);
     803        85976 :         buf.put(&self.keys[..]);
     804        85976 :         buf.put(&self.values[..]);
     805        85976 : 
     806        85976 :         assert!(buf.len() == self.size);
     807              : 
     808        85976 :         assert!(buf.len() <= PAGE_SZ);
     809        85976 :         buf.extend_with(0, PAGE_SZ - buf.len());
     810        85976 :         buf.freeze()
     811        85976 :     }
     812              : 
     813       217320 :     fn first_suffix(&self) -> &[u8] {
     814       217320 :         &self.keys[..self.suffix_len]
     815       217320 :     }
     816       142384 :     fn last_suffix(&self) -> &[u8] {
     817       142384 :         &self.keys[self.keys.len() - self.suffix_len..]
     818       142384 :     }
     819              : 
     820              :     /// Return the full first key of the page, including the prefix
     821        74936 :     fn first_key(&self) -> [u8; L] {
     822        74936 :         let mut key = [0u8; L];
     823        74936 :         key[..self.prefix.len()].copy_from_slice(&self.prefix);
     824        74936 :         key[self.prefix.len()..].copy_from_slice(self.first_suffix());
     825        74936 :         key
     826        74936 :     }
     827              : }
     828              : 
     829              : #[cfg(test)]
     830              : pub(crate) mod tests {
     831              :     use std::collections::BTreeMap;
     832              :     use std::sync::atomic::{AtomicUsize, Ordering};
     833              : 
     834              :     use rand::Rng;
     835              : 
     836              :     use super::*;
     837              :     use crate::context::DownloadBehavior;
     838              :     use crate::task_mgr::TaskKind;
     839              :     use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
     840              : 
     841              :     #[derive(Clone, Default)]
     842              :     pub(crate) struct TestDisk {
     843              :         blocks: Vec<IoBuffer>,
     844              :     }
     845              :     impl TestDisk {
     846           60 :         fn new() -> Self {
     847           60 :             Self::default()
     848           60 :         }
     849      6100260 :         pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
     850      6100260 :             let mut buf = [0u8; PAGE_SZ];
     851      6100260 :             buf.copy_from_slice(&self.blocks[blknum as usize]);
     852      6100260 :             Ok(std::sync::Arc::new(buf).into())
     853      6100260 :         }
     854              :     }
     855              :     impl BlockReader for TestDisk {
     856      2467161 :         fn block_cursor(&self) -> BlockCursor<'_> {
     857      2467161 :             BlockCursor::new(BlockReaderRef::TestDisk(self))
     858      2467161 :         }
     859              :     }
     860              :     impl BlockWriter for &mut TestDisk {
     861         1293 :         fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
     862         1293 :             let blknum = self.blocks.len();
     863         1293 :             self.blocks.push(buf);
     864         1293 :             Ok(blknum as u32)
     865         1293 :         }
     866              :     }
     867              : 
     868              :     #[tokio::test]
     869           12 :     async fn basic() -> Result<()> {
     870           12 :         let mut disk = TestDisk::new();
     871           12 :         let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
     872           12 : 
     873           12 :         let ctx =
     874           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     875           12 : 
     876           12 :         let all_keys: Vec<&[u8; 6]> = vec![
     877           12 :             b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
     878           12 :         ];
     879           12 :         let all_data: Vec<(&[u8; 6], u64)> = all_keys
     880           12 :             .iter()
     881           12 :             .enumerate()
     882           96 :             .map(|(idx, key)| (*key, idx as u64))
     883           12 :             .collect();
     884           96 :         for (key, val) in all_data.iter() {
     885           96 :             writer.append(key, *val)?;
     886           12 :         }
     887           12 : 
     888           12 :         let (root_offset, _writer) = writer.finish()?;
     889           12 : 
     890           12 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
     891           12 : 
     892           12 :         reader.dump(&ctx).await?;
     893           12 : 
     894           12 :         // Test the `get` function on all the keys.
     895           96 :         for (key, val) in all_data.iter() {
     896           96 :             assert_eq!(reader.get(key, &ctx).await?, Some(*val));
     897           12 :         }
     898           12 :         // And on some keys that don't exist
     899           12 :         assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
     900           12 :         assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
     901           12 :         assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
     902           12 : 
     903           12 :         // Test search with `visit` function
     904           12 :         let search_key = b"xabaaa";
     905           12 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     906           12 :             .iter()
     907           96 :             .filter(|(key, _value)| key[..] >= search_key[..])
     908           60 :             .map(|(key, value)| (key.to_vec(), *value))
     909           12 :             .collect();
     910           12 : 
     911           12 :         let mut data = Vec::new();
     912           12 :         reader
     913           12 :             .visit(
     914           12 :                 search_key,
     915           12 :                 VisitDirection::Forwards,
     916           60 :                 |key, value| {
     917           60 :                     data.push((key.to_vec(), value));
     918           60 :                     true
     919           60 :                 },
     920           12 :                 &ctx,
     921           12 :             )
     922           12 :             .await?;
     923           12 :         assert_eq!(data, expected);
     924           12 : 
     925           12 :         // Test a backwards scan
     926           12 :         let mut expected: Vec<(Vec<u8>, u64)> = all_data
     927           12 :             .iter()
     928           96 :             .filter(|(key, _value)| key[..] <= search_key[..])
     929           48 :             .map(|(key, value)| (key.to_vec(), *value))
     930           12 :             .collect();
     931           12 :         expected.reverse();
     932           12 :         let mut data = Vec::new();
     933           12 :         reader
     934           12 :             .visit(
     935           12 :                 search_key,
     936           12 :                 VisitDirection::Backwards,
     937           48 :                 |key, value| {
     938           48 :                     data.push((key.to_vec(), value));
     939           48 :                     true
     940           48 :                 },
     941           12 :                 &ctx,
     942           12 :             )
     943           12 :             .await?;
     944           12 :         assert_eq!(data, expected);
     945           12 : 
     946           12 :         // Backward scan where nothing matches
     947           12 :         reader
     948           12 :             .visit(
     949           12 :                 b"aaaaaa",
     950           12 :                 VisitDirection::Backwards,
     951           12 :                 |key, value| {
     952            0 :                     panic!("found unexpected key {}: {}", hex::encode(key), value);
     953           12 :                 },
     954           12 :                 &ctx,
     955           12 :             )
     956           12 :             .await?;
     957           12 : 
     958           12 :         // Full scan
     959           12 :         let expected: Vec<(Vec<u8>, u64)> = all_data
     960           12 :             .iter()
     961           96 :             .map(|(key, value)| (key.to_vec(), *value))
     962           12 :             .collect();
     963           12 :         let mut data = Vec::new();
     964           12 :         reader
     965           12 :             .visit(
     966           12 :                 &[0u8; 6],
     967           12 :                 VisitDirection::Forwards,
     968           96 :                 |key, value| {
     969           96 :                     data.push((key.to_vec(), value));
     970           96 :                     true
     971           96 :                 },
     972           12 :                 &ctx,
     973           12 :             )
     974           12 :             .await?;
     975           12 :         assert_eq!(data, expected);
     976           12 : 
     977           12 :         Ok(())
     978           12 :     }
     979              : 
     980              :     #[tokio::test]
     981           12 :     async fn lots_of_keys() -> Result<()> {
     982           12 :         let mut disk = TestDisk::new();
     983           12 :         let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
     984           12 :         let ctx =
     985           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     986           12 : 
     987           12 :         const NUM_KEYS: u64 = 1000;
     988           12 : 
     989           12 :         let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
     990           12 : 
     991        12012 :         for idx in 0..NUM_KEYS {
     992        12000 :             let key_int: u64 = 1 + idx * 2;
     993        12000 :             let key = u64::to_be_bytes(key_int);
     994        12000 :             writer.append(&key, idx)?;
     995           12 : 
     996        12000 :             all_data.insert(key_int, idx);
     997           12 :         }
     998           12 : 
     999           12 :         let (root_offset, _writer) = writer.finish()?;
    1000           12 : 
    1001           12 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1002           12 : 
    1003           12 :         reader.dump(&ctx).await?;
    1004           12 : 
    1005           12 :         use std::sync::Mutex;
    1006           12 : 
    1007           12 :         let result = Mutex::new(Vec::new());
    1008           12 :         let limit: AtomicUsize = AtomicUsize::new(10);
    1009       502920 :         let take_ten = |key: &[u8], value: u64| {
    1010       502920 :             let mut keybuf = [0u8; 8];
    1011       502920 :             keybuf.copy_from_slice(key);
    1012       502920 :             let key_int = u64::from_be_bytes(keybuf);
    1013       502920 : 
    1014       502920 :             let mut result = result.lock().unwrap();
    1015       502920 :             result.push((key_int, value));
    1016       502920 : 
    1017       502920 :             // keep going until we have 10 matches
    1018       502920 :             result.len() < limit.load(Ordering::Relaxed)
    1019       502920 :         };
    1020           12 : 
    1021        24120 :         for search_key_int in 0..(NUM_KEYS * 2 + 10) {
    1022        24120 :             let search_key = u64::to_be_bytes(search_key_int);
    1023        24120 :             assert_eq!(
    1024        24120 :                 reader.get(&search_key, &ctx).await?,
    1025        24120 :                 all_data.get(&search_key_int).cloned()
    1026           12 :             );
    1027           12 : 
    1028           12 :             // Test a forward scan starting with this key
    1029        24120 :             result.lock().unwrap().clear();
    1030        24120 :             reader
    1031        24120 :                 .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1032        24120 :                 .await?;
    1033        24120 :             let expected = all_data
    1034        24120 :                 .range(search_key_int..)
    1035        24120 :                 .take(10)
    1036       238920 :                 .map(|(&key, &val)| (key, val))
    1037        24120 :                 .collect::<Vec<(u64, u64)>>();
    1038        24120 :             assert_eq!(*result.lock().unwrap(), expected);
    1039           12 : 
    1040           12 :             // And a backwards scan
    1041        24120 :             result.lock().unwrap().clear();
    1042        24120 :             reader
    1043        24120 :                 .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1044        24120 :                 .await?;
    1045        24120 :             let expected = all_data
    1046        24120 :                 .range(..=search_key_int)
    1047        24120 :                 .rev()
    1048        24120 :                 .take(10)
    1049       240000 :                 .map(|(&key, &val)| (key, val))
    1050        24120 :                 .collect::<Vec<(u64, u64)>>();
    1051        24120 :             assert_eq!(*result.lock().unwrap(), expected);
    1052           12 :         }
    1053           12 : 
    1054           12 :         // full scan
    1055           12 :         let search_key = u64::to_be_bytes(0);
    1056           12 :         limit.store(usize::MAX, Ordering::Relaxed);
    1057           12 :         result.lock().unwrap().clear();
    1058           12 :         reader
    1059           12 :             .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
    1060           12 :             .await?;
    1061           12 :         let expected = all_data
    1062           12 :             .iter()
    1063        12000 :             .map(|(&key, &val)| (key, val))
    1064           12 :             .collect::<Vec<(u64, u64)>>();
    1065           12 :         assert_eq!(*result.lock().unwrap(), expected);
    1066           12 : 
    1067           12 :         // full scan
    1068           12 :         let search_key = u64::to_be_bytes(u64::MAX);
    1069           12 :         limit.store(usize::MAX, Ordering::Relaxed);
    1070           12 :         result.lock().unwrap().clear();
    1071           12 :         reader
    1072           12 :             .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
    1073           12 :             .await?;
    1074           12 :         let expected = all_data
    1075           12 :             .iter()
    1076           12 :             .rev()
    1077        12000 :             .map(|(&key, &val)| (key, val))
    1078           12 :             .collect::<Vec<(u64, u64)>>();
    1079           12 :         assert_eq!(*result.lock().unwrap(), expected);
    1080           12 : 
    1081           12 :         Ok(())
    1082           12 :     }
    1083              : 
    1084              :     #[tokio::test]
    1085           12 :     async fn random_data() -> Result<()> {
    1086           12 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1087           12 : 
    1088           12 :         // Generate random keys with exponential distribution, to
    1089           12 :         // exercise the prefix compression
    1090           12 :         const NUM_KEYS: usize = 100000;
    1091           12 :         let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
    1092      1200012 :         for idx in 0..NUM_KEYS {
    1093      1200000 :             let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
    1094      1200000 :             let t = -(f64::ln(u));
    1095      1200000 :             let key_int = (t * 1000000.0) as u128;
    1096      1200000 : 
    1097      1200000 :             all_data.insert(key_int, idx as u64);
    1098      1200000 :         }
    1099           12 : 
    1100           12 :         // Build a tree from it
    1101           12 :         let mut disk = TestDisk::new();
    1102           12 :         let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
    1103           12 : 
    1104      1170489 :         for (&key, &val) in all_data.iter() {
    1105      1170489 :             writer.append(&u128::to_be_bytes(key), val)?;
    1106           12 :         }
    1107           12 :         let (root_offset, _writer) = writer.finish()?;
    1108           12 : 
    1109           12 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1110           12 : 
    1111           12 :         // Test get() operation on all the keys
    1112      1170489 :         for (&key, &val) in all_data.iter() {
    1113      1170489 :             let search_key = u128::to_be_bytes(key);
    1114      1170489 :             assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
    1115           12 :         }
    1116           12 : 
    1117           12 :         // Test get() operations on random keys, most of which will not exist
    1118      1200012 :         for _ in 0..100000 {
    1119      1200000 :             let key_int = rand::thread_rng().r#gen::<u128>();
    1120      1200000 :             let search_key = u128::to_be_bytes(key_int);
    1121      1200000 :             assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
    1122           12 :         }
    1123           12 : 
    1124           12 :         // Test boundary cases
    1125           12 :         assert!(
    1126           12 :             reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
    1127           12 :                 == all_data.get(&u128::MIN).cloned()
    1128           12 :         );
    1129           12 :         assert!(
    1130           12 :             reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
    1131           12 :                 == all_data.get(&u128::MAX).cloned()
    1132           12 :         );
    1133           12 : 
    1134           12 :         // Test iterator and get_stream API
    1135           12 :         let mut iter = reader.iter(&[0; 16], &ctx);
    1136           12 :         let mut cnt = 0;
    1137      1170501 :         while let Some(res) = iter.next().await {
    1138      1170489 :             let (key, val) = res?;
    1139      1170489 :             let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
    1140      1170489 :             assert_eq!(val, *all_data.get(&key).unwrap());
    1141      1170489 :             cnt += 1;
    1142           12 :         }
    1143           12 :         assert_eq!(cnt, all_data.len());
    1144           12 : 
    1145           12 :         Ok(())
    1146           12 :     }
    1147              : 
    1148              :     #[test]
    1149           12 :     fn unsorted_input() {
    1150           12 :         let mut disk = TestDisk::new();
    1151           12 :         let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
    1152           12 : 
    1153           12 :         let _ = writer.append(b"ba", 1);
    1154           12 :         let _ = writer.append(b"bb", 2);
    1155           12 :         let err = writer.append(b"aa", 3).expect_err("should've failed");
    1156           12 :         match err {
    1157           12 :             DiskBtreeError::UnsortedInput { key, last_key } => {
    1158           12 :                 assert_eq!(key.as_ref(), b"aa".as_slice());
    1159           12 :                 assert_eq!(last_key.as_ref(), b"bb".as_slice());
    1160              :             }
    1161            0 :             _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
    1162              :         }
    1163           12 :     }
    1164              : 
    1165              :     ///
    1166              :     /// This test contains a particular data set, see disk_btree_test_data.rs
    1167              :     ///
    1168              :     #[tokio::test]
    1169           12 :     async fn particular_data() -> Result<()> {
    1170           12 :         // Build a tree from it
    1171           12 :         let mut disk = TestDisk::new();
    1172           12 :         let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
    1173           12 :         let ctx =
    1174           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1175           12 : 
    1176        24012 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1177        24000 :             writer.append(&key, val)?;
    1178           12 :         }
    1179           12 :         let (root_offset, writer) = writer.finish()?;
    1180           12 : 
    1181           12 :         println!("SIZE: {} blocks", writer.blocks.len());
    1182           12 : 
    1183           12 :         let reader = DiskBtreeReader::new(0, root_offset, disk);
    1184           12 : 
    1185           12 :         // Test get() operation on all the keys
    1186        24012 :         for (key, val) in disk_btree_test_data::TEST_DATA {
    1187        24000 :             assert_eq!(reader.get(&key, &ctx).await?, Some(val));
    1188           12 :         }
    1189           12 : 
    1190           12 :         // Test full scan
    1191           12 :         let mut count = 0;
    1192           12 :         reader
    1193           12 :             .visit(
    1194           12 :                 &[0u8; 26],
    1195           12 :                 VisitDirection::Forwards,
    1196        24000 :                 |_key, _value| {
    1197        24000 :                     count += 1;
    1198        24000 :                     true
    1199        24000 :                 },
    1200           12 :                 &ctx,
    1201           12 :             )
    1202           12 :             .await?;
    1203           12 :         assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
    1204           12 : 
    1205           12 :         reader.dump(&ctx).await?;
    1206           12 : 
    1207           12 :         Ok(())
    1208           12 :     }
    1209              : }
    1210              : 
    1211              : #[cfg(test)]
    1212              : #[path = "disk_btree_test_data.rs"]
    1213              : mod disk_btree_test_data;
        

Generated by: LCOV version 2.1-beta