Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use byteorder::{ReadBytesExt, BE};
22 : use bytes::{BufMut, Bytes, BytesMut};
23 : use either::Either;
24 : use hex;
25 : use std::{cmp::Ordering, io, result};
26 : use thiserror::Error;
27 : use tracing::error;
28 :
29 : use crate::{
30 : context::{DownloadBehavior, RequestContext},
31 : task_mgr::TaskKind,
32 : tenant::block_io::{BlockReader, BlockWriter},
33 : };
34 :
35 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
36 : pub const VALUE_SZ: usize = 5;
37 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
38 :
39 : #[allow(dead_code)]
40 : pub const PAGE_SZ: usize = 8192;
41 :
42 0 : #[derive(Clone, Copy, Debug)]
43 : struct Value([u8; VALUE_SZ]);
44 :
45 : impl Value {
46 77609621 : fn from_slice(slice: &[u8]) -> Value {
47 77609621 : let mut b = [0u8; VALUE_SZ];
48 77609621 : b.copy_from_slice(slice);
49 77609621 : Value(b)
50 77609621 : }
51 :
52 52093577 : fn from_u64(x: u64) -> Value {
53 52093577 : assert!(x <= 0x007f_ffff_ffff);
54 52093577 : Value([
55 52093577 : (x >> 32) as u8,
56 52093577 : (x >> 24) as u8,
57 52093577 : (x >> 16) as u8,
58 52093577 : (x >> 8) as u8,
59 52093577 : x as u8,
60 52093577 : ])
61 52093577 : }
62 :
63 101259 : fn from_blknum(x: u32) -> Value {
64 101259 : Value([
65 101259 : 0x80,
66 101259 : (x >> 24) as u8,
67 101259 : (x >> 16) as u8,
68 101259 : (x >> 8) as u8,
69 101259 : x as u8,
70 101259 : ])
71 101259 : }
72 :
73 : #[allow(dead_code)]
74 0 : fn is_offset(self) -> bool {
75 0 : self.0[0] & 0x80 != 0
76 0 : }
77 :
78 71264179 : fn to_u64(self) -> u64 {
79 71264179 : let b = &self.0;
80 71264179 : (b[0] as u64) << 32
81 71264179 : | (b[1] as u64) << 24
82 71264179 : | (b[2] as u64) << 16
83 71264179 : | (b[3] as u64) << 8
84 71264179 : | b[4] as u64
85 71264179 : }
86 :
87 6339417 : fn to_blknum(self) -> u32 {
88 6339417 : let b = &self.0;
89 6339417 : assert!(b[0] == 0x80);
90 6339417 : (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
91 6339417 : }
92 : }
93 :
94 0 : #[derive(Error, Debug)]
95 : pub enum DiskBtreeError {
96 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
97 : AppendOverflow(u64),
98 :
99 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
100 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
101 :
102 : #[error("Could not push to new leaf node")]
103 : FailedToPushToNewLeafNode,
104 :
105 : #[error("IoError: {0}")]
106 : Io(#[from] io::Error),
107 : }
108 :
109 : pub type Result<T> = result::Result<T, DiskBtreeError>;
110 :
111 : /// This is the on-disk representation.
112 : struct OnDiskNode<'a, const L: usize> {
113 : // Fixed-width fields
114 : num_children: u16,
115 : level: u8,
116 : prefix_len: u8,
117 : suffix_len: u8,
118 :
119 : // Variable-length fields. These are stored on-disk after the fixed-width
120 : // fields, in this order. In the in-memory representation, these point to
121 : // the right parts in the page buffer.
122 : prefix: &'a [u8],
123 : keys: &'a [u8],
124 : values: &'a [u8],
125 : }
126 :
127 : impl<'a, const L: usize> OnDiskNode<'a, L> {
128 : ///
129 : /// Interpret a PAGE_SZ page as a node.
130 : ///
131 23909552 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
132 23909552 : let mut cursor = std::io::Cursor::new(buf);
133 23909552 : let num_children = cursor.read_u16::<BE>()?;
134 23909552 : let level = cursor.read_u8()?;
135 23909552 : let prefix_len = cursor.read_u8()?;
136 23909552 : let suffix_len = cursor.read_u8()?;
137 :
138 23909552 : let mut off = cursor.position();
139 23909552 : let prefix_off = off as usize;
140 23909552 : off += prefix_len as u64;
141 23909552 :
142 23909552 : let keys_off = off as usize;
143 23909552 : let keys_len = num_children as usize * suffix_len as usize;
144 23909552 : off += keys_len as u64;
145 23909552 :
146 23909552 : let values_off = off as usize;
147 23909552 : let values_len = num_children as usize * VALUE_SZ;
148 23909552 : //off += values_len as u64;
149 23909552 :
150 23909552 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
151 23909552 : let keys = &buf[keys_off..keys_off + keys_len];
152 23909552 : let values = &buf[values_off..values_off + values_len];
153 23909552 :
154 23909552 : Ok(OnDiskNode {
155 23909552 : num_children,
156 23909552 : level,
157 23909552 : prefix_len,
158 23909552 : suffix_len,
159 23909552 : prefix,
160 23909552 : keys,
161 23909552 : values,
162 23909552 : })
163 23909552 : }
164 :
165 : ///
166 : /// Read a value at 'idx'
167 : ///
168 77609544 : fn value(&self, idx: usize) -> Value {
169 77609544 : let value_off = idx * VALUE_SZ;
170 77609544 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
171 77609544 : Value::from_slice(value_slice)
172 77609544 : }
173 :
174 23598435 : fn binary_search(
175 23598435 : &self,
176 23598435 : search_key: &[u8; L],
177 23598435 : keybuf: &mut [u8],
178 23598435 : ) -> result::Result<usize, usize> {
179 23598435 : let mut size = self.num_children as usize;
180 23598435 : let mut low = 0;
181 23598435 : let mut high = size;
182 144526114 : while low < high {
183 121592325 : let mid = low + size / 2;
184 121592325 :
185 121592325 : let key_off = mid * self.suffix_len as usize;
186 121592325 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
187 121592325 : // Does this match?
188 121592325 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
189 121592325 :
190 121592325 : let cmp = keybuf[..].cmp(search_key);
191 121592325 :
192 121592325 : if cmp == Ordering::Less {
193 39436071 : low = mid + 1;
194 82156254 : } else if cmp == Ordering::Greater {
195 81491608 : high = mid;
196 81491608 : } else {
197 664646 : return Ok(mid);
198 : }
199 120927679 : size = high - low;
200 : }
201 22933789 : Err(low)
202 23598435 : }
203 : }
204 :
205 : ///
206 : /// Public reader object, to search the tree.
207 : ///
208 : pub struct DiskBtreeReader<R, const L: usize>
209 : where
210 : R: BlockReader,
211 : {
212 : start_blk: u32,
213 : root_blk: u32,
214 : reader: R,
215 : }
216 :
217 23598436 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
218 : pub enum VisitDirection {
219 : Forwards,
220 : Backwards,
221 : }
222 :
223 : impl<R, const L: usize> DiskBtreeReader<R, L>
224 : where
225 : R: BlockReader,
226 : {
227 16847879 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
228 16847879 : DiskBtreeReader {
229 16847879 : start_blk,
230 16847879 : root_blk,
231 16847879 : reader,
232 16847879 : }
233 16847879 : }
234 :
235 : ///
236 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
237 : ///
238 831129 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
239 831129 : let mut result: Option<u64> = None;
240 831129 : self.visit(
241 831129 : search_key,
242 831129 : VisitDirection::Forwards,
243 831129 : |key, value| {
244 631105 : if key == search_key {
245 629096 : result = Some(value);
246 629096 : }
247 631105 : false
248 831129 : },
249 831129 : ctx,
250 831129 : )
251 6330 : .await?;
252 831129 : Ok(result)
253 831129 : }
254 :
255 : ///
256 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
257 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
258 : /// backwards)
259 : ///
260 17259027 : pub async fn visit<V>(
261 17259027 : &self,
262 17259027 : search_key: &[u8; L],
263 17259027 : dir: VisitDirection,
264 17259027 : mut visitor: V,
265 17259027 : ctx: &RequestContext,
266 17259027 : ) -> Result<bool>
267 17259027 : where
268 17259027 : V: FnMut(&[u8], u64) -> bool,
269 17259027 : {
270 17259027 : let mut stack = Vec::new();
271 17259027 : stack.push((self.root_blk, None));
272 17259027 : let block_cursor = self.reader.block_cursor();
273 24110255 : while let Some((node_blknum, opt_iter)) = stack.pop() {
274 : // Locate the node.
275 23903521 : let node_buf = block_cursor
276 23903521 : .read_blk(self.start_blk + node_blknum, ctx)
277 222413 : .await?;
278 :
279 23903520 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
280 23903520 : let prefix_len = node.prefix_len as usize;
281 23903520 : let suffix_len = node.suffix_len as usize;
282 23903520 :
283 23903520 : assert!(node.num_children > 0);
284 :
285 23903520 : let mut keybuf = Vec::new();
286 23903520 : keybuf.extend(node.prefix);
287 23903520 : keybuf.resize(prefix_len + suffix_len, 0);
288 :
289 23903520 : let mut iter = if let Some(iter) = opt_iter {
290 305085 : iter
291 23598435 : } else if dir == VisitDirection::Forwards {
292 : // Locate the first match
293 1352938 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
294 631502 : Ok(idx) => idx,
295 721436 : Err(idx) => {
296 721436 : if node.level == 0 {
297 : // Imagine that the node contains the following keys:
298 : //
299 : // 1
300 : // 3 <-- idx
301 : // 5
302 : //
303 : // If the search key is '2' and there is exact match,
304 : // the binary search would return the index of key
305 : // '3'. That's cool, '3' is the first key to return.
306 238854 : idx
307 : } else {
308 : // This is an internal page, so each key represents a lower
309 : // bound for what's in the child page. If there is no exact
310 : // match, we have to return the *previous* entry.
311 : //
312 : // 1 <-- return this
313 : // 3 <-- idx
314 : // 5
315 482582 : idx.saturating_sub(1)
316 : }
317 : }
318 : };
319 1352938 : Either::Left(idx..node.num_children.into())
320 : } else {
321 22245497 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
322 33144 : Ok(idx) => {
323 33144 : // Exact match. That's the first entry to return, and walk
324 33144 : // backwards from there.
325 33144 : idx
326 : }
327 22212353 : Err(idx) => {
328 : // No exact match. The binary search returned the index of the
329 : // first key that's > search_key. Back off by one, and walk
330 : // backwards from there.
331 22212353 : if let Some(idx) = idx.checked_sub(1) {
332 11840056 : idx
333 : } else {
334 10372297 : return Ok(false);
335 : }
336 : }
337 : };
338 11873200 : Either::Right((0..=idx).rev())
339 : };
340 :
341 : // idx points to the first match now. Keep going from there
342 78115332 : while let Some(idx) = iter.next() {
343 77603513 : let key_off = idx * suffix_len;
344 77603513 : let suffix = &node.keys[key_off..key_off + suffix_len];
345 77603513 : keybuf[prefix_len..].copy_from_slice(suffix);
346 77603513 : let value = node.value(idx);
347 77603513 : #[allow(clippy::collapsible_if)]
348 77603513 : if node.level == 0 {
349 : // leaf
350 71264104 : if !visitor(&keybuf, value.to_u64()) {
351 6679994 : return Ok(false);
352 64584109 : }
353 : } else {
354 6339409 : stack.push((node_blknum, Some(iter)));
355 6339409 : stack.push((value.to_blknum(), None));
356 6339409 : break;
357 : }
358 : }
359 : }
360 206734 : Ok(true)
361 17259025 : }
362 :
363 : #[allow(dead_code)]
364 10 : pub async fn dump(&self) -> Result<()> {
365 10 : let mut stack = Vec::new();
366 10 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
367 10 :
368 10 : stack.push((self.root_blk, String::new(), 0, 0, 0));
369 10 :
370 10 : let block_cursor = self.reader.block_cursor();
371 :
372 6042 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
373 6032 : let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
374 6032 : let buf: &[u8] = blk.as_ref();
375 6032 : let node = OnDiskNode::<L>::deparse(buf)?;
376 :
377 6032 : if child_idx == 0 {
378 18 : print!("{:indent$}", "", indent = depth * 2);
379 18 : let path_prefix = stack
380 18 : .iter()
381 18 : .map(|(_blknum, path, ..)| path.as_str())
382 18 : .collect::<String>();
383 18 : println!(
384 18 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
385 18 : hex::encode(node.prefix),
386 18 : node.suffix_len
387 18 : );
388 6014 : }
389 :
390 6032 : if child_idx + 1 < node.num_children {
391 6014 : let key_off = key_off + node.suffix_len as usize;
392 6014 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
393 6014 : }
394 6032 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
395 6032 : let val = node.value(child_idx as usize);
396 6032 :
397 6032 : print!("{:indent$}", "", indent = depth * 2 + 2);
398 6032 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
399 6032 :
400 6032 : if node.level > 0 {
401 8 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
402 6024 : }
403 : }
404 10 : Ok(())
405 10 : }
406 : }
407 :
408 : ///
409 : /// Public builder object, for creating a new tree.
410 : ///
411 : /// Usage: Create a builder object by calling 'new', load all the data into the
412 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
413 : ///
414 : /// 'L' is the key length in bytes
415 : pub struct DiskBtreeBuilder<W, const L: usize>
416 : where
417 : W: BlockWriter,
418 : {
419 : writer: W,
420 :
421 : ///
422 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
423 : ///
424 : /// We maintain the length of the stack to be always greater than zero.
425 : /// Two exceptions are:
426 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
427 : /// So because other methods cannot see the intermediate state invariant still holds.
428 : /// 2. `Self::finish`. It consumes self and does not return it back,
429 : /// which means that this is where the structure is destroyed.
430 : /// Thus stack of zero length cannot be observed by other methods.
431 : stack: Vec<BuildNode<L>>,
432 :
433 : /// Last key that was appended to the tree. Used to sanity check that append
434 : /// is called in increasing key order.
435 : last_key: Option<[u8; L]>,
436 : }
437 :
438 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
439 : where
440 : W: BlockWriter,
441 : {
442 22318 : pub fn new(writer: W) -> Self {
443 22318 : DiskBtreeBuilder {
444 22318 : writer,
445 22318 : last_key: None,
446 22318 : stack: vec![BuildNode::new(0)],
447 22318 : }
448 22318 : }
449 :
450 52093575 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
451 52093575 : if value > MAX_VALUE {
452 0 : return Err(DiskBtreeError::AppendOverflow(value));
453 52093575 : }
454 52093575 : if let Some(last_key) = &self.last_key {
455 52071257 : if key <= last_key {
456 2 : return Err(DiskBtreeError::UnsortedInput {
457 2 : key: key.as_slice().into(),
458 2 : last_key: last_key.as_slice().into(),
459 2 : });
460 52071255 : }
461 22318 : }
462 52093573 : self.last_key = Some(*key);
463 52093573 :
464 52093573 : self.append_internal(key, Value::from_u64(value))
465 52093575 : }
466 :
467 52194832 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
468 52194832 : // Try to append to the current leaf buffer
469 52194832 : let last = self
470 52194832 : .stack
471 52194832 : .last_mut()
472 52194832 : .expect("should always have at least one item");
473 52194832 : let level = last.level;
474 52194832 : if last.push(key, value) {
475 52006670 : return Ok(());
476 188162 : }
477 188162 :
478 188162 : // It did not fit. Try to compress, and if it succeeds to make
479 188162 : // some room on the node, try appending to it again.
480 188162 : #[allow(clippy::collapsible_if)]
481 188162 : if last.compress() {
482 95307 : if last.push(key, value) {
483 95237 : return Ok(());
484 70 : }
485 92855 : }
486 :
487 : // Could not append to the current leaf. Flush it and create a new one.
488 92925 : self.flush_node()?;
489 :
490 : // Replace the node we flushed with an empty one and append the new
491 : // key to it.
492 92925 : let mut last = BuildNode::new(level);
493 92925 : if !last.push(key, value) {
494 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
495 92925 : }
496 92925 :
497 92925 : self.stack.push(last);
498 92925 :
499 92925 : Ok(())
500 52194832 : }
501 :
502 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
503 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
504 : /// creates a new root page, increasing the height of the tree.
505 101259 : fn flush_node(&mut self) -> Result<()> {
506 101259 : // Get the current bottommost node in the stack and flush it to disk.
507 101259 : let last = self
508 101259 : .stack
509 101259 : .pop()
510 101259 : .expect("should always have at least one item");
511 101259 : let buf = last.pack();
512 101259 : let downlink_key = last.first_key();
513 101259 : let downlink_ptr = self.writer.write_blk(buf)?;
514 :
515 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
516 : // create a new root page, increasing the height of the tree.
517 101259 : if self.stack.is_empty() {
518 8338 : self.stack.push(BuildNode::new(last.level + 1));
519 92921 : }
520 101259 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
521 101259 : }
522 :
523 : ///
524 : /// Flushes everything to disk, and returns the block number of the root page.
525 : /// The caller must store the root block number "out-of-band", and pass it
526 : /// to the DiskBtreeReader::new() when you want to read the tree again.
527 : /// (In the image and delta layers, it is stored in the beginning of the file,
528 : /// in the summary header)
529 : ///
530 22312 : pub fn finish(mut self) -> Result<(u32, W)> {
531 : // flush all levels, except the root.
532 30646 : while self.stack.len() > 1 {
533 8334 : self.flush_node()?;
534 : }
535 :
536 22312 : let root = self
537 22312 : .stack
538 22312 : .first()
539 22312 : .expect("by the check above we left one item there");
540 22312 : let buf = root.pack();
541 22312 : let root_blknum = self.writer.write_blk(buf)?;
542 :
543 22312 : Ok((root_blknum, self.writer))
544 22312 : }
545 :
546 2181017 : pub fn borrow_writer(&self) -> &W {
547 2181017 : &self.writer
548 2181017 : }
549 : }
550 :
551 : ///
552 : /// BuildNode represesnts an incomplete page that we are appending to.
553 : ///
554 0 : #[derive(Clone, Debug)]
555 : struct BuildNode<const L: usize> {
556 : num_children: u16,
557 : level: u8,
558 : prefix: Vec<u8>,
559 : suffix_len: usize,
560 :
561 : keys: Vec<u8>,
562 : values: Vec<u8>,
563 :
564 : size: usize, // physical size of this node, if it was written to disk like this
565 : }
566 :
567 : const NODE_SIZE: usize = PAGE_SZ;
568 :
569 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
570 :
571 : impl<const L: usize> BuildNode<L> {
572 123581 : fn new(level: u8) -> Self {
573 123581 : BuildNode {
574 123581 : num_children: 0,
575 123581 : level,
576 123581 : prefix: Vec::new(),
577 123581 : suffix_len: 0,
578 123581 : keys: Vec::new(),
579 123581 : values: Vec::new(),
580 123581 : size: NODE_HDR_SIZE,
581 123581 : }
582 123581 : }
583 :
584 : /// Try to append a key-value pair to this node. Returns 'true' on
585 : /// success, 'false' if the page was full or the key was
586 : /// incompatible with the prefix of the existing keys.
587 52383064 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
588 52383064 : // If we have already performed prefix-compression on the page,
589 52383064 : // check that the incoming key has the same prefix.
590 52383064 : if self.num_children > 0 {
591 : // does the prefix allow it?
592 52259483 : if !key.starts_with(&self.prefix) {
593 23285 : return false;
594 52236198 : }
595 123581 : } else {
596 123581 : self.suffix_len = key.len();
597 123581 : }
598 :
599 : // Is the node too full?
600 52359779 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
601 164947 : return false;
602 52194832 : }
603 52194832 :
604 52194832 : // All clear
605 52194832 : self.num_children += 1;
606 52194832 : self.keys.extend(&key[self.prefix.len()..]);
607 52194832 : self.values.extend(value.0);
608 52194832 :
609 52194832 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
610 52194832 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
611 :
612 52194832 : self.size += self.suffix_len + VALUE_SZ;
613 52194832 :
614 52194832 : true
615 52383064 : }
616 :
617 : ///
618 : /// Perform prefix-compression.
619 : ///
620 : /// Returns 'true' on success, 'false' if no compression was possible.
621 : ///
622 188162 : fn compress(&mut self) -> bool {
623 188162 : let first_suffix = self.first_suffix();
624 188162 : let last_suffix = self.last_suffix();
625 188162 :
626 188162 : // Find the common prefix among all keys
627 188162 : let mut prefix_len = 0;
628 1850918 : while prefix_len < self.suffix_len {
629 1850918 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
630 188162 : break;
631 1662756 : }
632 1662756 : prefix_len += 1;
633 : }
634 188162 : if prefix_len == 0 {
635 92855 : return false;
636 95307 : }
637 95307 :
638 95307 : // Can compress. Rewrite the keys without the common prefix.
639 95307 : self.prefix.extend(&self.keys[..prefix_len]);
640 95307 :
641 95307 : let mut new_keys = Vec::new();
642 95307 : let mut key_off = 0;
643 25298659 : while key_off < self.keys.len() {
644 25203352 : let next_key_off = key_off + self.suffix_len;
645 25203352 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
646 25203352 : key_off = next_key_off;
647 25203352 : }
648 95307 : self.keys = new_keys;
649 95307 : self.suffix_len -= prefix_len;
650 95307 :
651 95307 : self.size -= prefix_len * self.num_children as usize;
652 95307 : self.size += prefix_len;
653 95307 :
654 95307 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
655 95307 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
656 :
657 95307 : true
658 188162 : }
659 :
660 : ///
661 : /// Serialize the node to on-disk format.
662 : ///
663 123571 : fn pack(&self) -> Bytes {
664 123571 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
665 123571 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
666 123571 : assert!(self.num_children > 0);
667 :
668 123571 : let mut buf = BytesMut::new();
669 123571 :
670 123571 : buf.put_u16(self.num_children);
671 123571 : buf.put_u8(self.level);
672 123571 : buf.put_u8(self.prefix.len() as u8);
673 123571 : buf.put_u8(self.suffix_len as u8);
674 123571 : buf.put(&self.prefix[..]);
675 123571 : buf.put(&self.keys[..]);
676 123571 : buf.put(&self.values[..]);
677 123571 :
678 123571 : assert!(buf.len() == self.size);
679 :
680 123571 : assert!(buf.len() <= PAGE_SZ);
681 123571 : buf.resize(PAGE_SZ, 0);
682 123571 : buf.freeze()
683 123571 : }
684 :
685 289421 : fn first_suffix(&self) -> &[u8] {
686 289421 : &self.keys[..self.suffix_len]
687 289421 : }
688 188162 : fn last_suffix(&self) -> &[u8] {
689 188162 : &self.keys[self.keys.len() - self.suffix_len..]
690 188162 : }
691 :
692 : /// Return the full first key of the page, including the prefix
693 101259 : fn first_key(&self) -> [u8; L] {
694 101259 : let mut key = [0u8; L];
695 101259 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
696 101259 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
697 101259 : key
698 101259 : }
699 : }
700 :
701 : #[cfg(test)]
702 : pub(crate) mod tests {
703 : use super::*;
704 : use crate::context::DownloadBehavior;
705 : use crate::task_mgr::TaskKind;
706 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
707 : use rand::Rng;
708 : use std::collections::BTreeMap;
709 : use std::sync::atomic::{AtomicUsize, Ordering};
710 :
711 10 : #[derive(Clone, Default)]
712 : pub(crate) struct TestDisk {
713 : blocks: Vec<Bytes>,
714 : }
715 : impl TestDisk {
716 10 : fn new() -> Self {
717 10 : Self::default()
718 10 : }
719 1016266 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
720 1016266 : let mut buf = [0u8; PAGE_SZ];
721 1016266 : buf.copy_from_slice(&self.blocks[blknum as usize]);
722 1016266 : Ok(std::sync::Arc::new(buf).into())
723 1016266 : }
724 : }
725 : impl BlockReader for TestDisk {
726 411166 : fn block_cursor(&self) -> BlockCursor<'_> {
727 411166 : BlockCursor::new(BlockReaderRef::TestDisk(self))
728 411166 : }
729 : }
730 : impl BlockWriter for &mut TestDisk {
731 213 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
732 213 : let blknum = self.blocks.len();
733 213 : self.blocks.push(buf);
734 213 : Ok(blknum as u32)
735 213 : }
736 : }
737 :
738 2 : #[tokio::test]
739 2 : async fn basic() -> Result<()> {
740 2 : let mut disk = TestDisk::new();
741 2 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
742 2 :
743 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
744 2 :
745 2 : let all_keys: Vec<&[u8; 6]> = vec![
746 2 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
747 2 : ];
748 2 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
749 2 : .iter()
750 2 : .enumerate()
751 16 : .map(|(idx, key)| (*key, idx as u64))
752 2 : .collect();
753 16 : for (key, val) in all_data.iter() {
754 16 : writer.append(key, *val)?;
755 2 : }
756 2 :
757 2 : let (root_offset, _writer) = writer.finish()?;
758 2 :
759 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
760 2 :
761 2 : reader.dump().await?;
762 2 :
763 2 : // Test the `get` function on all the keys.
764 16 : for (key, val) in all_data.iter() {
765 16 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
766 2 : }
767 2 : // And on some keys that don't exist
768 2 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
769 2 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
770 2 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
771 2 :
772 2 : // Test search with `visit` function
773 2 : let search_key = b"xabaaa";
774 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
775 2 : .iter()
776 16 : .filter(|(key, _value)| key[..] >= search_key[..])
777 10 : .map(|(key, value)| (key.to_vec(), *value))
778 2 : .collect();
779 2 :
780 2 : let mut data = Vec::new();
781 2 : reader
782 2 : .visit(
783 2 : search_key,
784 2 : VisitDirection::Forwards,
785 10 : |key, value| {
786 10 : data.push((key.to_vec(), value));
787 10 : true
788 10 : },
789 2 : &ctx,
790 2 : )
791 2 : .await?;
792 2 : assert_eq!(data, expected);
793 2 :
794 2 : // Test a backwards scan
795 2 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
796 2 : .iter()
797 16 : .filter(|(key, _value)| key[..] <= search_key[..])
798 8 : .map(|(key, value)| (key.to_vec(), *value))
799 2 : .collect();
800 2 : expected.reverse();
801 2 : let mut data = Vec::new();
802 2 : reader
803 2 : .visit(
804 2 : search_key,
805 2 : VisitDirection::Backwards,
806 8 : |key, value| {
807 8 : data.push((key.to_vec(), value));
808 8 : true
809 8 : },
810 2 : &ctx,
811 2 : )
812 2 : .await?;
813 2 : assert_eq!(data, expected);
814 2 :
815 2 : // Backward scan where nothing matches
816 2 : reader
817 2 : .visit(
818 2 : b"aaaaaa",
819 2 : VisitDirection::Backwards,
820 2 : |key, value| {
821 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
822 2 : },
823 2 : &ctx,
824 2 : )
825 2 : .await?;
826 2 :
827 2 : // Full scan
828 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
829 2 : .iter()
830 16 : .map(|(key, value)| (key.to_vec(), *value))
831 2 : .collect();
832 2 : let mut data = Vec::new();
833 2 : reader
834 2 : .visit(
835 2 : &[0u8; 6],
836 2 : VisitDirection::Forwards,
837 16 : |key, value| {
838 16 : data.push((key.to_vec(), value));
839 16 : true
840 16 : },
841 2 : &ctx,
842 2 : )
843 2 : .await?;
844 2 : assert_eq!(data, expected);
845 2 :
846 2 : Ok(())
847 2 : }
848 :
849 2 : #[tokio::test]
850 2 : async fn lots_of_keys() -> Result<()> {
851 2 : let mut disk = TestDisk::new();
852 2 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
853 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
854 2 :
855 2 : const NUM_KEYS: u64 = 1000;
856 2 :
857 2 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
858 2 :
859 2002 : for idx in 0..NUM_KEYS {
860 2000 : let key_int: u64 = 1 + idx * 2;
861 2000 : let key = u64::to_be_bytes(key_int);
862 2000 : writer.append(&key, idx)?;
863 2 :
864 2000 : all_data.insert(key_int, idx);
865 2 : }
866 2 :
867 2 : let (root_offset, _writer) = writer.finish()?;
868 2 :
869 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
870 2 :
871 2 : reader.dump().await?;
872 2 :
873 2 : use std::sync::Mutex;
874 2 :
875 2 : let result = Mutex::new(Vec::new());
876 2 : let limit: AtomicUsize = AtomicUsize::new(10);
877 83820 : let take_ten = |key: &[u8], value: u64| {
878 83820 : let mut keybuf = [0u8; 8];
879 83820 : keybuf.copy_from_slice(key);
880 83820 : let key_int = u64::from_be_bytes(keybuf);
881 83820 :
882 83820 : let mut result = result.lock().unwrap();
883 83820 : result.push((key_int, value));
884 83820 :
885 83820 : // keep going until we have 10 matches
886 83820 : result.len() < limit.load(Ordering::Relaxed)
887 83820 : };
888 2 :
889 4020 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
890 4020 : let search_key = u64::to_be_bytes(search_key_int);
891 4020 : assert_eq!(
892 4020 : reader.get(&search_key, &ctx).await?,
893 4020 : all_data.get(&search_key_int).cloned()
894 2 : );
895 2 :
896 2 : // Test a forward scan starting with this key
897 4020 : result.lock().unwrap().clear();
898 4020 : reader
899 4020 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
900 2 : .await?;
901 4020 : let expected = all_data
902 4020 : .range(search_key_int..)
903 4020 : .take(10)
904 39820 : .map(|(&key, &val)| (key, val))
905 4020 : .collect::<Vec<(u64, u64)>>();
906 4020 : assert_eq!(*result.lock().unwrap(), expected);
907 2 :
908 2 : // And a backwards scan
909 4020 : result.lock().unwrap().clear();
910 4020 : reader
911 4020 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
912 2 : .await?;
913 4020 : let expected = all_data
914 4020 : .range(..=search_key_int)
915 4020 : .rev()
916 4020 : .take(10)
917 40000 : .map(|(&key, &val)| (key, val))
918 4020 : .collect::<Vec<(u64, u64)>>();
919 4020 : assert_eq!(*result.lock().unwrap(), expected);
920 2 : }
921 2 :
922 2 : // full scan
923 2 : let search_key = u64::to_be_bytes(0);
924 2 : limit.store(usize::MAX, Ordering::Relaxed);
925 2 : result.lock().unwrap().clear();
926 2 : reader
927 2 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
928 2 : .await?;
929 2 : let expected = all_data
930 2 : .iter()
931 2000 : .map(|(&key, &val)| (key, val))
932 2 : .collect::<Vec<(u64, u64)>>();
933 2 : assert_eq!(*result.lock().unwrap(), expected);
934 2 :
935 2 : // full scan
936 2 : let search_key = u64::to_be_bytes(u64::MAX);
937 2 : limit.store(usize::MAX, Ordering::Relaxed);
938 2 : result.lock().unwrap().clear();
939 2 : reader
940 2 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
941 2 : .await?;
942 2 : let expected = all_data
943 2 : .iter()
944 2 : .rev()
945 2000 : .map(|(&key, &val)| (key, val))
946 2 : .collect::<Vec<(u64, u64)>>();
947 2 : assert_eq!(*result.lock().unwrap(), expected);
948 2 :
949 2 : Ok(())
950 2 : }
951 :
952 2 : #[tokio::test]
953 2 : async fn random_data() -> Result<()> {
954 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
955 2 :
956 2 : // Generate random keys with exponential distribution, to
957 2 : // exercise the prefix compression
958 2 : const NUM_KEYS: usize = 100000;
959 2 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
960 200002 : for idx in 0..NUM_KEYS {
961 200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
962 200000 : let t = -(f64::ln(u));
963 200000 : let key_int = (t * 1000000.0) as u128;
964 200000 :
965 200000 : all_data.insert(key_int, idx as u64);
966 200000 : }
967 2 :
968 2 : // Build a tree from it
969 2 : let mut disk = TestDisk::new();
970 2 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
971 2 :
972 195060 : for (&key, &val) in all_data.iter() {
973 195060 : writer.append(&u128::to_be_bytes(key), val)?;
974 2 : }
975 2 : let (root_offset, _writer) = writer.finish()?;
976 2 :
977 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
978 2 :
979 2 : // Test get() operation on all the keys
980 195060 : for (&key, &val) in all_data.iter() {
981 195060 : let search_key = u128::to_be_bytes(key);
982 195060 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
983 2 : }
984 2 :
985 2 : // Test get() operations on random keys, most of which will not exist
986 200002 : for _ in 0..100000 {
987 200000 : let key_int = rand::thread_rng().gen::<u128>();
988 200000 : let search_key = u128::to_be_bytes(key_int);
989 200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
990 2 : }
991 2 :
992 2 : // Test boundary cases
993 2 : assert!(
994 2 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
995 2 : == all_data.get(&u128::MIN).cloned()
996 2 : );
997 2 : assert!(
998 2 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
999 2 : == all_data.get(&u128::MAX).cloned()
1000 2 : );
1001 2 :
1002 2 : Ok(())
1003 2 : }
1004 :
1005 2 : #[test]
1006 2 : fn unsorted_input() {
1007 2 : let mut disk = TestDisk::new();
1008 2 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1009 2 :
1010 2 : let _ = writer.append(b"ba", 1);
1011 2 : let _ = writer.append(b"bb", 2);
1012 2 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1013 2 : match err {
1014 2 : DiskBtreeError::UnsortedInput { key, last_key } => {
1015 2 : assert_eq!(key.as_ref(), b"aa".as_slice());
1016 2 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1017 : }
1018 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1019 : }
1020 2 : }
1021 :
1022 : ///
1023 : /// This test contains a particular data set, see disk_btree_test_data.rs
1024 : ///
1025 2 : #[tokio::test]
1026 2 : async fn particular_data() -> Result<()> {
1027 2 : // Build a tree from it
1028 2 : let mut disk = TestDisk::new();
1029 2 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1030 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1031 2 :
1032 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1033 4000 : writer.append(&key, val)?;
1034 2 : }
1035 2 : let (root_offset, writer) = writer.finish()?;
1036 2 :
1037 2 : println!("SIZE: {} blocks", writer.blocks.len());
1038 2 :
1039 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1040 2 :
1041 2 : // Test get() operation on all the keys
1042 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1043 4000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1044 2 : }
1045 2 :
1046 2 : // Test full scan
1047 2 : let mut count = 0;
1048 2 : reader
1049 2 : .visit(
1050 2 : &[0u8; 26],
1051 2 : VisitDirection::Forwards,
1052 4000 : |_key, _value| {
1053 4000 : count += 1;
1054 4000 : true
1055 4000 : },
1056 2 : &ctx,
1057 2 : )
1058 2 : .await?;
1059 2 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1060 2 :
1061 2 : reader.dump().await?;
1062 2 :
1063 2 : Ok(())
1064 2 : }
1065 : }
1066 :
1067 : #[cfg(test)]
1068 : #[path = "disk_btree_test_data.rs"]
1069 : mod disk_btree_test_data;
|