Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use byteorder::{ReadBytesExt, BE};
22 : use bytes::{BufMut, Bytes, BytesMut};
23 : use either::Either;
24 : use hex;
25 : use std::{cmp::Ordering, io, result};
26 : use thiserror::Error;
27 : use tracing::error;
28 :
29 : use crate::tenant::block_io::{BlockReader, BlockWriter};
30 :
31 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
32 : pub const VALUE_SZ: usize = 5;
33 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
34 :
35 : #[allow(dead_code)]
36 : pub const PAGE_SZ: usize = 8192;
37 :
38 0 : #[derive(Clone, Copy, Debug)]
39 : struct Value([u8; VALUE_SZ]);
40 :
41 : impl Value {
42 281076334 : fn from_slice(slice: &[u8]) -> Value {
43 281076334 : let mut b = [0u8; VALUE_SZ];
44 281076334 : b.copy_from_slice(slice);
45 281076334 : Value(b)
46 281076334 : }
47 :
48 91821560 : fn from_u64(x: u64) -> Value {
49 91821560 : assert!(x <= 0x007f_ffff_ffff);
50 91821560 : Value([
51 91821560 : (x >> 32) as u8,
52 91821560 : (x >> 24) as u8,
53 91821560 : (x >> 16) as u8,
54 91821560 : (x >> 8) as u8,
55 91821560 : x as u8,
56 91821560 : ])
57 91821560 : }
58 :
59 177124 : fn from_blknum(x: u32) -> Value {
60 177124 : Value([
61 177124 : 0x80,
62 177124 : (x >> 24) as u8,
63 177124 : (x >> 16) as u8,
64 177124 : (x >> 8) as u8,
65 177124 : x as u8,
66 177124 : ])
67 177124 : }
68 :
69 : #[allow(dead_code)]
70 0 : fn is_offset(self) -> bool {
71 0 : self.0[0] & 0x80 != 0
72 0 : }
73 :
74 251205646 : fn to_u64(self) -> u64 {
75 251205646 : let b = &self.0;
76 251205646 : (b[0] as u64) << 32
77 251205646 : | (b[1] as u64) << 24
78 251205646 : | (b[2] as u64) << 16
79 251205646 : | (b[3] as u64) << 8
80 251205646 : | b[4] as u64
81 251205646 : }
82 :
83 29867676 : fn to_blknum(self) -> u32 {
84 29867676 : let b = &self.0;
85 29867676 : assert!(b[0] == 0x80);
86 29867676 : (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
87 29867676 : }
88 : }
89 :
90 0 : #[derive(Error, Debug)]
91 : pub enum DiskBtreeError {
92 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
93 : AppendOverflow(u64),
94 :
95 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
96 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
97 :
98 : #[error("Could not push to new leaf node")]
99 : FailedToPushToNewLeafNode,
100 :
101 : #[error("IoError: {0}")]
102 : Io(#[from] io::Error),
103 : }
104 :
105 : pub type Result<T> = result::Result<T, DiskBtreeError>;
106 :
107 : /// This is the on-disk representation.
108 : struct OnDiskNode<'a, const L: usize> {
109 : // Fixed-width fields
110 : num_children: u16,
111 : level: u8,
112 : prefix_len: u8,
113 : suffix_len: u8,
114 :
115 : // Variable-length fields. These are stored on-disk after the fixed-width
116 : // fields, in this order. In the in-memory representation, these point to
117 : // the right parts in the page buffer.
118 : prefix: &'a [u8],
119 : keys: &'a [u8],
120 : values: &'a [u8],
121 : }
122 :
123 : impl<'a, const L: usize> OnDiskNode<'a, L> {
124 : ///
125 : /// Interpret a PAGE_SZ page as a node.
126 : ///
127 76221254 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
128 76221254 : let mut cursor = std::io::Cursor::new(buf);
129 76221254 : let num_children = cursor.read_u16::<BE>()?;
130 76221254 : let level = cursor.read_u8()?;
131 76221254 : let prefix_len = cursor.read_u8()?;
132 76221254 : let suffix_len = cursor.read_u8()?;
133 :
134 76221254 : let mut off = cursor.position();
135 76221254 : let prefix_off = off as usize;
136 76221254 : off += prefix_len as u64;
137 76221254 :
138 76221254 : let keys_off = off as usize;
139 76221254 : let keys_len = num_children as usize * suffix_len as usize;
140 76221254 : off += keys_len as u64;
141 76221254 :
142 76221254 : let values_off = off as usize;
143 76221254 : let values_len = num_children as usize * VALUE_SZ;
144 76221254 : //off += values_len as u64;
145 76221254 :
146 76221254 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
147 76221254 : let keys = &buf[keys_off..keys_off + keys_len];
148 76221254 : let values = &buf[values_off..values_off + values_len];
149 76221254 :
150 76221254 : Ok(OnDiskNode {
151 76221254 : num_children,
152 76221254 : level,
153 76221254 : prefix_len,
154 76221254 : suffix_len,
155 76221254 : prefix,
156 76221254 : keys,
157 76221254 : values,
158 76221254 : })
159 76221254 : }
160 :
161 : ///
162 : /// Read a value at 'idx'
163 : ///
164 281076760 : fn value(&self, idx: usize) -> Value {
165 281076760 : let value_off = idx * VALUE_SZ;
166 281076760 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
167 281076760 : Value::from_slice(value_slice)
168 281076760 : }
169 :
170 75773660 : fn binary_search(
171 75773660 : &self,
172 75773660 : search_key: &[u8; L],
173 75773660 : keybuf: &mut [u8],
174 75773660 : ) -> result::Result<usize, usize> {
175 75773660 : let mut size = self.num_children as usize;
176 75773660 : let mut low = 0;
177 75773660 : let mut high = size;
178 567082414 : while low < high {
179 492104295 : let mid = low + size / 2;
180 492104295 :
181 492104295 : let key_off = mid * self.suffix_len as usize;
182 492104295 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
183 492104295 : // Does this match?
184 492104295 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
185 492104295 :
186 492104295 : let cmp = keybuf[..].cmp(search_key);
187 492104295 :
188 492104295 : if cmp == Ordering::Less {
189 188113573 : low = mid + 1;
190 303990722 : } else if cmp == Ordering::Greater {
191 303195181 : high = mid;
192 303195181 : } else {
193 795541 : return Ok(mid);
194 : }
195 491308754 : size = high - low;
196 : }
197 74978119 : Err(low)
198 75773660 : }
199 : }
200 :
201 : ///
202 : /// Public reader object, to search the tree.
203 : ///
204 : pub struct DiskBtreeReader<R, const L: usize>
205 : where
206 : R: BlockReader,
207 : {
208 : start_blk: u32,
209 : root_blk: u32,
210 : reader: R,
211 : }
212 :
213 75773588 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
214 : pub enum VisitDirection {
215 : Forwards,
216 : Backwards,
217 : }
218 :
219 : impl<R, const L: usize> DiskBtreeReader<R, L>
220 : where
221 : R: BlockReader,
222 : {
223 45700352 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
224 45700352 : DiskBtreeReader {
225 45700352 : start_blk,
226 45700352 : root_blk,
227 45700352 : reader,
228 45700352 : }
229 45700352 : }
230 :
231 : ///
232 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
233 : ///
234 860509 : pub async fn get(&self, search_key: &[u8; L]) -> Result<Option<u64>> {
235 860509 : let mut result: Option<u64> = None;
236 860509 : self.visit(search_key, VisitDirection::Forwards, |key, value| {
237 760497 : if key == search_key {
238 759492 : result = Some(value);
239 759492 : }
240 760497 : false
241 860509 : })
242 2478 : .await?;
243 860509 : Ok(result)
244 860509 : }
245 :
246 : ///
247 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
248 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
249 : /// backwards)
250 : ///
251 45905965 : pub async fn visit<V>(
252 45905965 : &self,
253 45905965 : search_key: &[u8; L],
254 45905965 : dir: VisitDirection,
255 45905965 : mut visitor: V,
256 45905965 : ) -> Result<bool>
257 45905965 : where
258 45905965 : V: FnMut(&[u8], u64) -> bool,
259 45905965 : {
260 45905965 : let mut stack = Vec::new();
261 45905965 : stack.push((self.root_blk, None));
262 45905965 : let block_cursor = self.reader.block_cursor();
263 76333529 : while let Some((node_blknum, opt_iter)) = stack.pop() {
264 : // Locate the node.
265 76218238 : let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
266 :
267 76218238 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
268 76218238 : let prefix_len = node.prefix_len as usize;
269 76218238 : let suffix_len = node.suffix_len as usize;
270 76218238 :
271 76218238 : assert!(node.num_children > 0);
272 :
273 76218238 : let mut keybuf = Vec::new();
274 76218238 : keybuf.extend(node.prefix);
275 76218238 : keybuf.resize(prefix_len + suffix_len, 0);
276 :
277 76218238 : let mut iter = if let Some(iter) = opt_iter {
278 444578 : iter
279 75773660 : } else if dir == VisitDirection::Forwards {
280 : // Locate the first match
281 1552239 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
282 761027 : Ok(idx) => idx,
283 791212 : Err(idx) => {
284 791212 : if node.level == 0 {
285 : // Imagine that the node contains the following keys:
286 : //
287 : // 1
288 : // 3 <-- idx
289 : // 5
290 : //
291 : // If the search key is '2' and there is exact match,
292 : // the binary search would return the index of key
293 : // '3'. That's cool, '3' is the first key to return.
294 189230 : idx
295 : } else {
296 : // This is an internal page, so each key represents a lower
297 : // bound for what's in the child page. If there is no exact
298 : // match, we have to return the *previous* entry.
299 : //
300 : // 1 <-- return this
301 : // 3 <-- idx
302 : // 5
303 601982 : idx.saturating_sub(1)
304 : }
305 : }
306 : };
307 1552239 : Either::Left(idx..node.num_children.into())
308 : } else {
309 74221421 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
310 34514 : Ok(idx) => {
311 34514 : // Exact match. That's the first entry to return, and walk
312 34514 : // backwards from there.
313 34514 : idx
314 : }
315 74186907 : Err(idx) => {
316 : // No exact match. The binary search returned the index of the
317 : // first key that's > search_key. Back off by one, and walk
318 : // backwards from there.
319 74186907 : if let Some(idx) = idx.checked_sub(1) {
320 58705990 : idx
321 : } else {
322 15480917 : return Ok(false);
323 : }
324 : }
325 : };
326 58740504 : Either::Right((0..=idx).rev())
327 : };
328 :
329 : // idx points to the first match now. Keep going from there
330 281633613 : while let Some(idx) = iter.next() {
331 281073744 : let key_off = idx * suffix_len;
332 281073744 : let suffix = &node.keys[key_off..key_off + suffix_len];
333 281073744 : keybuf[prefix_len..].copy_from_slice(suffix);
334 281073744 : let value = node.value(idx);
335 281073744 : #[allow(clippy::collapsible_if)]
336 281073744 : if node.level == 0 {
337 : // leaf
338 251206049 : if !visitor(&keybuf, value.to_u64()) {
339 30309757 : return Ok(false);
340 220896292 : }
341 : } else {
342 29867695 : stack.push((node_blknum, Some(iter)));
343 29867695 : stack.push((value.to_blknum(), None));
344 29867695 : break;
345 : }
346 : }
347 : }
348 115291 : Ok(true)
349 45905965 : }
350 :
351 : #[allow(dead_code)]
352 5 : pub async fn dump(&self) -> Result<()> {
353 5 : let mut stack = Vec::new();
354 5 :
355 5 : stack.push((self.root_blk, String::new(), 0, 0, 0));
356 5 :
357 5 : let block_cursor = self.reader.block_cursor();
358 :
359 3021 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
360 3016 : let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
361 3016 : let buf: &[u8] = blk.as_ref();
362 3016 : let node = OnDiskNode::<L>::deparse(buf)?;
363 :
364 3016 : if child_idx == 0 {
365 9 : print!("{:indent$}", "", indent = depth * 2);
366 9 : let path_prefix = stack
367 9 : .iter()
368 9 : .map(|(_blknum, path, ..)| path.as_str())
369 9 : .collect::<String>();
370 9 : println!(
371 9 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
372 9 : hex::encode(node.prefix),
373 9 : node.suffix_len
374 9 : );
375 3007 : }
376 :
377 3016 : if child_idx + 1 < node.num_children {
378 3007 : let key_off = key_off + node.suffix_len as usize;
379 3007 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
380 3007 : }
381 3016 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
382 3016 : let val = node.value(child_idx as usize);
383 3016 :
384 3016 : print!("{:indent$}", "", indent = depth * 2 + 2);
385 3016 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
386 3016 :
387 3016 : if node.level > 0 {
388 4 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
389 3012 : }
390 : }
391 5 : Ok(())
392 5 : }
393 : }
394 :
395 : ///
396 : /// Public builder object, for creating a new tree.
397 : ///
398 : /// Usage: Create a builder object by calling 'new', load all the data into the
399 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
400 : ///
401 : /// 'L' is the key length in bytes
402 : pub struct DiskBtreeBuilder<W, const L: usize>
403 : where
404 : W: BlockWriter,
405 : {
406 : writer: W,
407 :
408 : ///
409 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
410 : ///
411 : /// We maintain the length of the stack to be always greater than zero.
412 : /// Two exceptions are:
413 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
414 : /// So because other methods cannot see the intermediate state invariant still holds.
415 : /// 2. `Self::finish`. It consumes self and does not return it back,
416 : /// which means that this is where the structure is destroyed.
417 : /// Thus stack of zero length cannot be observed by other methods.
418 : stack: Vec<BuildNode<L>>,
419 :
420 : /// Last key that was appended to the tree. Used to sanity check that append
421 : /// is called in increasing key order.
422 : last_key: Option<[u8; L]>,
423 : }
424 :
425 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
426 : where
427 : W: BlockWriter,
428 : {
429 16632 : pub fn new(writer: W) -> Self {
430 16632 : DiskBtreeBuilder {
431 16632 : writer,
432 16632 : last_key: None,
433 16632 : stack: vec![BuildNode::new(0)],
434 16632 : }
435 16632 : }
436 :
437 91821788 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
438 91821788 : if value > MAX_VALUE {
439 0 : return Err(DiskBtreeError::AppendOverflow(value));
440 91821788 : }
441 91821788 : if let Some(last_key) = &self.last_key {
442 91805156 : if key <= last_key {
443 1 : return Err(DiskBtreeError::UnsortedInput {
444 1 : key: key.as_slice().into(),
445 1 : last_key: last_key.as_slice().into(),
446 1 : });
447 91805155 : }
448 16632 : }
449 91821787 : self.last_key = Some(*key);
450 91821787 :
451 91821787 : self.append_internal(key, Value::from_u64(value))
452 91821788 : }
453 :
454 91998911 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
455 91998911 : // Try to append to the current leaf buffer
456 91998911 : let last = self
457 91998911 : .stack
458 91998911 : .last_mut()
459 91998911 : .expect("should always have at least one item");
460 91998911 : let level = last.level;
461 91998911 : if last.push(key, value) {
462 91663221 : return Ok(());
463 335690 : }
464 335690 :
465 335690 : // It did not fit. Try to compress, and if it succeeds to make
466 335690 : // some room on the node, try appending to it again.
467 335690 : #[allow(clippy::collapsible_if)]
468 335690 : if last.compress() {
469 169291 : if last.push(key, value) {
470 169226 : return Ok(());
471 65 : }
472 166399 : }
473 :
474 : // Could not append to the current leaf. Flush it and create a new one.
475 166464 : self.flush_node()?;
476 :
477 : // Replace the node we flushed with an empty one and append the new
478 : // key to it.
479 166464 : let mut last = BuildNode::new(level);
480 166464 : if !last.push(key, value) {
481 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
482 166464 : }
483 166464 :
484 166464 : self.stack.push(last);
485 166464 :
486 166464 : Ok(())
487 91998911 : }
488 :
489 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
490 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
491 : /// creates a new root page, increasing the height of the tree.
492 177124 : fn flush_node(&mut self) -> Result<()> {
493 177124 : // Get the current bottommost node in the stack and flush it to disk.
494 177124 : let last = self
495 177124 : .stack
496 177124 : .pop()
497 177124 : .expect("should always have at least one item");
498 177124 : let buf = last.pack();
499 177124 : let downlink_key = last.first_key();
500 177124 : let downlink_ptr = self.writer.write_blk(buf)?;
501 :
502 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
503 : // create a new root page, increasing the height of the tree.
504 177124 : if self.stack.is_empty() {
505 10667 : self.stack.push(BuildNode::new(last.level + 1));
506 166457 : }
507 177124 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
508 177124 : }
509 :
510 : ///
511 : /// Flushes everything to disk, and returns the block number of the root page.
512 : /// The caller must store the root block number "out-of-band", and pass it
513 : /// to the DiskBtreeReader::new() when you want to read the tree again.
514 : /// (In the image and delta layers, it is stored in the beginning of the file,
515 : /// in the summary header)
516 : ///
517 16624 : pub fn finish(mut self) -> Result<(u32, W)> {
518 : // flush all levels, except the root.
519 27284 : while self.stack.len() > 1 {
520 10660 : self.flush_node()?;
521 : }
522 :
523 16624 : let root = self
524 16624 : .stack
525 16624 : .first()
526 16624 : .expect("by the check above we left one item there");
527 16624 : let buf = root.pack();
528 16624 : let root_blknum = self.writer.write_blk(buf)?;
529 :
530 16624 : Ok((root_blknum, self.writer))
531 16624 : }
532 :
533 1751920 : pub fn borrow_writer(&self) -> &W {
534 1751920 : &self.writer
535 1751920 : }
536 : }
537 :
538 : ///
539 : /// BuildNode represesnts an incomplete page that we are appending to.
540 : ///
541 0 : #[derive(Clone, Debug)]
542 : struct BuildNode<const L: usize> {
543 : num_children: u16,
544 : level: u8,
545 : prefix: Vec<u8>,
546 : suffix_len: usize,
547 :
548 : keys: Vec<u8>,
549 : values: Vec<u8>,
550 :
551 : size: usize, // physical size of this node, if it was written to disk like this
552 : }
553 :
554 : const NODE_SIZE: usize = PAGE_SZ;
555 :
556 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
557 :
558 : impl<const L: usize> BuildNode<L> {
559 193763 : fn new(level: u8) -> Self {
560 193763 : BuildNode {
561 193763 : num_children: 0,
562 193763 : level,
563 193763 : prefix: Vec::new(),
564 193763 : suffix_len: 0,
565 193763 : keys: Vec::new(),
566 193763 : values: Vec::new(),
567 193763 : size: NODE_HDR_SIZE,
568 193763 : }
569 193763 : }
570 :
571 : /// Try to append a key-value pair to this node. Returns 'true' on
572 : /// success, 'false' if the page was full or the key was
573 : /// incompatible with the prefix of the existing keys.
574 92334666 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
575 92334666 : // If we have already performed prefix-compression on the page,
576 92334666 : // check that the incoming key has the same prefix.
577 92334666 : if self.num_children > 0 {
578 : // does the prefix allow it?
579 92140903 : if !key.starts_with(&self.prefix) {
580 48200 : return false;
581 92092703 : }
582 193763 : } else {
583 193763 : self.suffix_len = key.len();
584 193763 : }
585 :
586 : // Is the node too full?
587 92286466 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
588 287555 : return false;
589 91998911 : }
590 91998911 :
591 91998911 : // All clear
592 91998911 : self.num_children += 1;
593 91998911 : self.keys.extend(&key[self.prefix.len()..]);
594 91998911 : self.values.extend(value.0);
595 91998911 :
596 91998911 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
597 91998911 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
598 :
599 91998911 : self.size += self.suffix_len + VALUE_SZ;
600 91998911 :
601 91998911 : true
602 92334666 : }
603 :
604 : ///
605 : /// Perform prefix-compression.
606 : ///
607 : /// Returns 'true' on success, 'false' if no compression was possible.
608 : ///
609 335690 : fn compress(&mut self) -> bool {
610 335690 : let first_suffix = self.first_suffix();
611 335690 : let last_suffix = self.last_suffix();
612 335690 :
613 335690 : // Find the common prefix among all keys
614 335690 : let mut prefix_len = 0;
615 3381974 : while prefix_len < self.suffix_len {
616 3381974 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
617 335690 : break;
618 3046284 : }
619 3046284 : prefix_len += 1;
620 : }
621 335690 : if prefix_len == 0 {
622 166399 : return false;
623 169291 : }
624 169291 :
625 169291 : // Can compress. Rewrite the keys without the common prefix.
626 169291 : self.prefix.extend(&self.keys[..prefix_len]);
627 169291 :
628 169291 : let mut new_keys = Vec::new();
629 169291 : let mut key_off = 0;
630 44891963 : while key_off < self.keys.len() {
631 44722672 : let next_key_off = key_off + self.suffix_len;
632 44722672 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
633 44722672 : key_off = next_key_off;
634 44722672 : }
635 169291 : self.keys = new_keys;
636 169291 : self.suffix_len -= prefix_len;
637 169291 :
638 169291 : self.size -= prefix_len * self.num_children as usize;
639 169291 : self.size += prefix_len;
640 169291 :
641 169291 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
642 169291 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
643 :
644 169291 : true
645 335690 : }
646 :
647 : ///
648 : /// Serialize the node to on-disk format.
649 : ///
650 193748 : fn pack(&self) -> Bytes {
651 193748 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
652 193748 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
653 193748 : assert!(self.num_children > 0);
654 :
655 193748 : let mut buf = BytesMut::new();
656 193748 :
657 193748 : buf.put_u16(self.num_children);
658 193748 : buf.put_u8(self.level);
659 193748 : buf.put_u8(self.prefix.len() as u8);
660 193748 : buf.put_u8(self.suffix_len as u8);
661 193748 : buf.put(&self.prefix[..]);
662 193748 : buf.put(&self.keys[..]);
663 193748 : buf.put(&self.values[..]);
664 193748 :
665 193748 : assert!(buf.len() == self.size);
666 :
667 193748 : assert!(buf.len() <= PAGE_SZ);
668 193748 : buf.resize(PAGE_SZ, 0);
669 193748 : buf.freeze()
670 193748 : }
671 :
672 512814 : fn first_suffix(&self) -> &[u8] {
673 512814 : &self.keys[..self.suffix_len]
674 512814 : }
675 335690 : fn last_suffix(&self) -> &[u8] {
676 335690 : &self.keys[self.keys.len() - self.suffix_len..]
677 335690 : }
678 :
679 : /// Return the full first key of the page, including the prefix
680 177124 : fn first_key(&self) -> [u8; L] {
681 177124 : let mut key = [0u8; L];
682 177124 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
683 177124 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
684 177124 : key
685 177124 : }
686 : }
687 :
688 : #[cfg(test)]
689 : pub(crate) mod tests {
690 : use super::*;
691 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
692 : use rand::Rng;
693 : use std::collections::BTreeMap;
694 : use std::sync::atomic::{AtomicUsize, Ordering};
695 :
696 5 : #[derive(Clone, Default)]
697 : pub(crate) struct TestDisk {
698 : blocks: Vec<Bytes>,
699 : }
700 : impl TestDisk {
701 5 : fn new() -> Self {
702 5 : Self::default()
703 5 : }
704 508211 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
705 508211 : let mut buf = [0u8; PAGE_SZ];
706 508211 : buf.copy_from_slice(&self.blocks[blknum as usize]);
707 508211 : Ok(std::sync::Arc::new(buf).into())
708 508211 : }
709 : }
710 : impl BlockReader for TestDisk {
711 205622 : fn block_cursor(&self) -> BlockCursor<'_> {
712 205622 : BlockCursor::new(BlockReaderRef::TestDisk(self))
713 205622 : }
714 : }
715 : impl BlockWriter for &mut TestDisk {
716 107 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
717 107 : let blknum = self.blocks.len();
718 107 : self.blocks.push(buf);
719 107 : Ok(blknum as u32)
720 107 : }
721 : }
722 :
723 1 : #[tokio::test]
724 1 : async fn basic() -> Result<()> {
725 1 : let mut disk = TestDisk::new();
726 1 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
727 1 :
728 1 : let all_keys: Vec<&[u8; 6]> = vec![
729 1 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
730 1 : ];
731 1 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
732 1 : .iter()
733 1 : .enumerate()
734 8 : .map(|(idx, key)| (*key, idx as u64))
735 1 : .collect();
736 8 : for (key, val) in all_data.iter() {
737 8 : writer.append(key, *val)?;
738 : }
739 :
740 1 : let (root_offset, _writer) = writer.finish()?;
741 :
742 1 : let reader = DiskBtreeReader::new(0, root_offset, disk);
743 1 :
744 1 : reader.dump().await?;
745 :
746 : // Test the `get` function on all the keys.
747 8 : for (key, val) in all_data.iter() {
748 8 : assert_eq!(reader.get(key).await?, Some(*val));
749 : }
750 : // And on some keys that don't exist
751 1 : assert_eq!(reader.get(b"aaaaaa").await?, None);
752 1 : assert_eq!(reader.get(b"zzzzzz").await?, None);
753 1 : assert_eq!(reader.get(b"xaaabx").await?, None);
754 :
755 : // Test search with `visit` function
756 1 : let search_key = b"xabaaa";
757 1 : let expected: Vec<(Vec<u8>, u64)> = all_data
758 1 : .iter()
759 8 : .filter(|(key, _value)| key[..] >= search_key[..])
760 5 : .map(|(key, value)| (key.to_vec(), *value))
761 1 : .collect();
762 1 :
763 1 : let mut data = Vec::new();
764 1 : reader
765 5 : .visit(search_key, VisitDirection::Forwards, |key, value| {
766 5 : data.push((key.to_vec(), value));
767 5 : true
768 5 : })
769 0 : .await?;
770 1 : assert_eq!(data, expected);
771 :
772 : // Test a backwards scan
773 1 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
774 1 : .iter()
775 8 : .filter(|(key, _value)| key[..] <= search_key[..])
776 4 : .map(|(key, value)| (key.to_vec(), *value))
777 1 : .collect();
778 1 : expected.reverse();
779 1 : let mut data = Vec::new();
780 1 : reader
781 4 : .visit(search_key, VisitDirection::Backwards, |key, value| {
782 4 : data.push((key.to_vec(), value));
783 4 : true
784 4 : })
785 0 : .await?;
786 1 : assert_eq!(data, expected);
787 :
788 : // Backward scan where nothing matches
789 1 : reader
790 1 : .visit(b"aaaaaa", VisitDirection::Backwards, |key, value| {
791 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
792 1 : })
793 0 : .await?;
794 :
795 : // Full scan
796 1 : let expected: Vec<(Vec<u8>, u64)> = all_data
797 1 : .iter()
798 8 : .map(|(key, value)| (key.to_vec(), *value))
799 1 : .collect();
800 1 : let mut data = Vec::new();
801 1 : reader
802 8 : .visit(&[0u8; 6], VisitDirection::Forwards, |key, value| {
803 8 : data.push((key.to_vec(), value));
804 8 : true
805 8 : })
806 0 : .await?;
807 1 : assert_eq!(data, expected);
808 :
809 1 : Ok(())
810 : }
811 :
812 1 : #[tokio::test]
813 1 : async fn lots_of_keys() -> Result<()> {
814 1 : let mut disk = TestDisk::new();
815 1 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
816 1 :
817 1 : const NUM_KEYS: u64 = 1000;
818 1 :
819 1 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
820 :
821 1001 : for idx in 0..NUM_KEYS {
822 1000 : let key_int: u64 = 1 + idx * 2;
823 1000 : let key = u64::to_be_bytes(key_int);
824 1000 : writer.append(&key, idx)?;
825 :
826 1000 : all_data.insert(key_int, idx);
827 : }
828 :
829 1 : let (root_offset, _writer) = writer.finish()?;
830 :
831 1 : let reader = DiskBtreeReader::new(0, root_offset, disk);
832 1 :
833 1 : reader.dump().await?;
834 :
835 : use std::sync::Mutex;
836 :
837 1 : let result = Mutex::new(Vec::new());
838 1 : let limit: AtomicUsize = AtomicUsize::new(10);
839 41910 : let take_ten = |key: &[u8], value: u64| {
840 41910 : let mut keybuf = [0u8; 8];
841 41910 : keybuf.copy_from_slice(key);
842 41910 : let key_int = u64::from_be_bytes(keybuf);
843 41910 :
844 41910 : let mut result = result.lock().unwrap();
845 41910 : result.push((key_int, value));
846 41910 :
847 41910 : // keep going until we have 10 matches
848 41910 : result.len() < limit.load(Ordering::Relaxed)
849 41910 : };
850 :
851 2010 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
852 2010 : let search_key = u64::to_be_bytes(search_key_int);
853 2010 : assert_eq!(
854 2010 : reader.get(&search_key).await?,
855 2010 : all_data.get(&search_key_int).cloned()
856 : );
857 :
858 : // Test a forward scan starting with this key
859 2010 : result.lock().unwrap().clear();
860 2010 : reader
861 2010 : .visit(&search_key, VisitDirection::Forwards, take_ten)
862 0 : .await?;
863 2010 : let expected = all_data
864 2010 : .range(search_key_int..)
865 2010 : .take(10)
866 19910 : .map(|(&key, &val)| (key, val))
867 2010 : .collect::<Vec<(u64, u64)>>();
868 2010 : assert_eq!(*result.lock().unwrap(), expected);
869 :
870 : // And a backwards scan
871 2010 : result.lock().unwrap().clear();
872 2010 : reader
873 2010 : .visit(&search_key, VisitDirection::Backwards, take_ten)
874 0 : .await?;
875 2010 : let expected = all_data
876 2010 : .range(..=search_key_int)
877 2010 : .rev()
878 2010 : .take(10)
879 20000 : .map(|(&key, &val)| (key, val))
880 2010 : .collect::<Vec<(u64, u64)>>();
881 2010 : assert_eq!(*result.lock().unwrap(), expected);
882 : }
883 :
884 : // full scan
885 1 : let search_key = u64::to_be_bytes(0);
886 1 : limit.store(usize::MAX, Ordering::Relaxed);
887 1 : result.lock().unwrap().clear();
888 1 : reader
889 1 : .visit(&search_key, VisitDirection::Forwards, take_ten)
890 0 : .await?;
891 1 : let expected = all_data
892 1 : .iter()
893 1000 : .map(|(&key, &val)| (key, val))
894 1 : .collect::<Vec<(u64, u64)>>();
895 1 : assert_eq!(*result.lock().unwrap(), expected);
896 :
897 : // full scan
898 1 : let search_key = u64::to_be_bytes(u64::MAX);
899 1 : limit.store(usize::MAX, Ordering::Relaxed);
900 1 : result.lock().unwrap().clear();
901 1 : reader
902 1 : .visit(&search_key, VisitDirection::Backwards, take_ten)
903 0 : .await?;
904 1 : let expected = all_data
905 1 : .iter()
906 1 : .rev()
907 1000 : .map(|(&key, &val)| (key, val))
908 1 : .collect::<Vec<(u64, u64)>>();
909 1 : assert_eq!(*result.lock().unwrap(), expected);
910 :
911 1 : Ok(())
912 : }
913 :
914 1 : #[tokio::test]
915 1 : async fn random_data() -> Result<()> {
916 1 : // Generate random keys with exponential distribution, to
917 1 : // exercise the prefix compression
918 1 : const NUM_KEYS: usize = 100000;
919 1 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
920 100001 : for idx in 0..NUM_KEYS {
921 100000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
922 100000 : let t = -(f64::ln(u));
923 100000 : let key_int = (t * 1000000.0) as u128;
924 100000 :
925 100000 : all_data.insert(key_int, idx as u64);
926 100000 : }
927 :
928 : // Build a tree from it
929 1 : let mut disk = TestDisk::new();
930 1 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
931 :
932 97569 : for (&key, &val) in all_data.iter() {
933 97569 : writer.append(&u128::to_be_bytes(key), val)?;
934 : }
935 1 : let (root_offset, _writer) = writer.finish()?;
936 :
937 1 : let reader = DiskBtreeReader::new(0, root_offset, disk);
938 :
939 : // Test get() operation on all the keys
940 97569 : for (&key, &val) in all_data.iter() {
941 97569 : let search_key = u128::to_be_bytes(key);
942 97569 : assert_eq!(reader.get(&search_key).await?, Some(val));
943 : }
944 :
945 : // Test get() operations on random keys, most of which will not exist
946 100001 : for _ in 0..100000 {
947 100000 : let key_int = rand::thread_rng().gen::<u128>();
948 100000 : let search_key = u128::to_be_bytes(key_int);
949 100000 : assert!(reader.get(&search_key).await? == all_data.get(&key_int).cloned());
950 : }
951 :
952 : // Test boundary cases
953 1 : assert!(
954 1 : reader.get(&u128::to_be_bytes(u128::MIN)).await? == all_data.get(&u128::MIN).cloned()
955 : );
956 1 : assert!(
957 1 : reader.get(&u128::to_be_bytes(u128::MAX)).await? == all_data.get(&u128::MAX).cloned()
958 : );
959 :
960 1 : Ok(())
961 : }
962 :
963 1 : #[test]
964 1 : fn unsorted_input() {
965 1 : let mut disk = TestDisk::new();
966 1 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
967 1 :
968 1 : let _ = writer.append(b"ba", 1);
969 1 : let _ = writer.append(b"bb", 2);
970 1 : let err = writer.append(b"aa", 3).expect_err("should've failed");
971 1 : match err {
972 1 : DiskBtreeError::UnsortedInput { key, last_key } => {
973 1 : assert_eq!(key.as_ref(), b"aa".as_slice());
974 1 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
975 : }
976 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
977 : }
978 1 : }
979 :
980 : ///
981 : /// This test contains a particular data set, see disk_btree_test_data.rs
982 : ///
983 1 : #[tokio::test]
984 1 : async fn particular_data() -> Result<()> {
985 1 : // Build a tree from it
986 1 : let mut disk = TestDisk::new();
987 1 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
988 :
989 2001 : for (key, val) in disk_btree_test_data::TEST_DATA {
990 2000 : writer.append(&key, val)?;
991 : }
992 1 : let (root_offset, writer) = writer.finish()?;
993 :
994 1 : println!("SIZE: {} blocks", writer.blocks.len());
995 1 :
996 1 : let reader = DiskBtreeReader::new(0, root_offset, disk);
997 :
998 : // Test get() operation on all the keys
999 2001 : for (key, val) in disk_btree_test_data::TEST_DATA {
1000 2000 : assert_eq!(reader.get(&key).await?, Some(val));
1001 : }
1002 :
1003 : // Test full scan
1004 1 : let mut count = 0;
1005 1 : reader
1006 2000 : .visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| {
1007 2000 : count += 1;
1008 2000 : true
1009 2000 : })
1010 0 : .await?;
1011 1 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1012 :
1013 1 : reader.dump().await?;
1014 :
1015 1 : Ok(())
1016 : }
1017 : }
1018 :
1019 : #[cfg(test)]
1020 : #[path = "disk_btree_test_data.rs"]
1021 : mod disk_btree_test_data;
|