Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use async_stream::try_stream;
22 : use byteorder::{ReadBytesExt, BE};
23 : use bytes::{BufMut, Bytes, BytesMut};
24 : use either::Either;
25 : use futures::Stream;
26 : use hex;
27 : use std::{
28 : cmp::Ordering,
29 : io,
30 : iter::Rev,
31 : ops::{Range, RangeInclusive},
32 : result,
33 : };
34 : use thiserror::Error;
35 : use tracing::error;
36 :
37 : use crate::{
38 : context::{DownloadBehavior, RequestContext},
39 : task_mgr::TaskKind,
40 : tenant::block_io::{BlockReader, BlockWriter},
41 : };
42 :
43 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
44 : pub const VALUE_SZ: usize = 5;
45 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
46 :
47 : pub const PAGE_SZ: usize = 8192;
48 :
49 : #[derive(Clone, Copy, Debug)]
50 : struct Value([u8; VALUE_SZ]);
51 :
52 : impl Value {
53 3467941 : fn from_slice(slice: &[u8]) -> Value {
54 3467941 : let mut b = [0u8; VALUE_SZ];
55 3467941 : b.copy_from_slice(slice);
56 3467941 : Value(b)
57 3467941 : }
58 :
59 6662863 : fn from_u64(x: u64) -> Value {
60 6662863 : assert!(x <= 0x007f_ffff_ffff);
61 6662863 : Value([
62 6662863 : (x >> 32) as u8,
63 6662863 : (x >> 24) as u8,
64 6662863 : (x >> 16) as u8,
65 6662863 : (x >> 8) as u8,
66 6662863 : x as u8,
67 6662863 : ])
68 6662863 : }
69 :
70 12421 : fn from_blknum(x: u32) -> Value {
71 12421 : Value([
72 12421 : 0x80,
73 12421 : (x >> 24) as u8,
74 12421 : (x >> 16) as u8,
75 12421 : (x >> 8) as u8,
76 12421 : x as u8,
77 12421 : ])
78 12421 : }
79 :
80 : #[allow(dead_code)]
81 0 : fn is_offset(self) -> bool {
82 0 : self.0[0] & 0x80 != 0
83 0 : }
84 :
85 2819145 : fn to_u64(self) -> u64 {
86 2819145 : let b = &self.0;
87 2819145 : (b[0] as u64) << 32
88 2819145 : | (b[1] as u64) << 24
89 2819145 : | (b[2] as u64) << 16
90 2819145 : | (b[3] as u64) << 8
91 2819145 : | b[4] as u64
92 2819145 : }
93 :
94 642772 : fn to_blknum(self) -> u32 {
95 642772 : let b = &self.0;
96 642772 : assert!(b[0] == 0x80);
97 642772 : (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
98 642772 : }
99 : }
100 :
101 0 : #[derive(Error, Debug)]
102 : pub enum DiskBtreeError {
103 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
104 : AppendOverflow(u64),
105 :
106 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
107 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
108 :
109 : #[error("Could not push to new leaf node")]
110 : FailedToPushToNewLeafNode,
111 :
112 : #[error("IoError: {0}")]
113 : Io(#[from] io::Error),
114 : }
115 :
116 : pub type Result<T> = result::Result<T, DiskBtreeError>;
117 :
118 : /// This is the on-disk representation.
119 : struct OnDiskNode<'a, const L: usize> {
120 : // Fixed-width fields
121 : num_children: u16,
122 : level: u8,
123 : prefix_len: u8,
124 : suffix_len: u8,
125 :
126 : // Variable-length fields. These are stored on-disk after the fixed-width
127 : // fields, in this order. In the in-memory representation, these point to
128 : // the right parts in the page buffer.
129 : prefix: &'a [u8],
130 : keys: &'a [u8],
131 : values: &'a [u8],
132 : }
133 :
134 : impl<'a, const L: usize> OnDiskNode<'a, L> {
135 : ///
136 : /// Interpret a PAGE_SZ page as a node.
137 : ///
138 1539517 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
139 1539517 : let mut cursor = std::io::Cursor::new(buf);
140 1539517 : let num_children = cursor.read_u16::<BE>()?;
141 1539517 : let level = cursor.read_u8()?;
142 1539517 : let prefix_len = cursor.read_u8()?;
143 1539517 : let suffix_len = cursor.read_u8()?;
144 :
145 1539517 : let mut off = cursor.position();
146 1539517 : let prefix_off = off as usize;
147 1539517 : off += prefix_len as u64;
148 1539517 :
149 1539517 : let keys_off = off as usize;
150 1539517 : let keys_len = num_children as usize * suffix_len as usize;
151 1539517 : off += keys_len as u64;
152 1539517 :
153 1539517 : let values_off = off as usize;
154 1539517 : let values_len = num_children as usize * VALUE_SZ;
155 1539517 : //off += values_len as u64;
156 1539517 :
157 1539517 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
158 1539517 : let keys = &buf[keys_off..keys_off + keys_len];
159 1539517 : let values = &buf[values_off..values_off + values_len];
160 1539517 :
161 1539517 : Ok(OnDiskNode {
162 1539517 : num_children,
163 1539517 : level,
164 1539517 : prefix_len,
165 1539517 : suffix_len,
166 1539517 : prefix,
167 1539517 : keys,
168 1539517 : values,
169 1539517 : })
170 1539517 : }
171 :
172 : ///
173 : /// Read a value at 'idx'
174 : ///
175 3467941 : fn value(&self, idx: usize) -> Value {
176 3467941 : let value_off = idx * VALUE_SZ;
177 3467941 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
178 3467941 : Value::from_slice(value_slice)
179 3467941 : }
180 :
181 1329208 : fn binary_search(
182 1329208 : &self,
183 1329208 : search_key: &[u8; L],
184 1329208 : keybuf: &mut [u8],
185 1329208 : ) -> result::Result<usize, usize> {
186 1329208 : let mut size = self.num_children as usize;
187 1329208 : let mut low = 0;
188 1329208 : let mut high = size;
189 10213040 : while low < high {
190 9096488 : let mid = low + size / 2;
191 9096488 :
192 9096488 : let key_off = mid * self.suffix_len as usize;
193 9096488 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
194 9096488 : // Does this match?
195 9096488 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
196 9096488 :
197 9096488 : let cmp = keybuf[..].cmp(search_key);
198 9096488 :
199 9096488 : if cmp == Ordering::Less {
200 5894255 : low = mid + 1;
201 5894255 : } else if cmp == Ordering::Greater {
202 2989577 : high = mid;
203 2989577 : } else {
204 212656 : return Ok(mid);
205 : }
206 8883832 : size = high - low;
207 : }
208 1116552 : Err(low)
209 1329208 : }
210 : }
211 :
212 : ///
213 : /// Public reader object, to search the tree.
214 : ///
215 : pub struct DiskBtreeReader<R, const L: usize>
216 : where
217 : R: BlockReader,
218 : {
219 : start_blk: u32,
220 : root_blk: u32,
221 : reader: R,
222 : }
223 :
224 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
225 : pub enum VisitDirection {
226 : Forwards,
227 : Backwards,
228 : }
229 :
230 : impl<R, const L: usize> DiskBtreeReader<R, L>
231 : where
232 : R: BlockReader,
233 : {
234 211305 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
235 211305 : DiskBtreeReader {
236 211305 : start_blk,
237 211305 : root_blk,
238 211305 : reader,
239 211305 : }
240 211305 : }
241 :
242 : ///
243 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
244 : ///
245 410056 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
246 410056 : let mut result: Option<u64> = None;
247 410056 : self.visit(
248 410056 : search_key,
249 410056 : VisitDirection::Forwards,
250 410056 : |key, value| {
251 210032 : if key == search_key {
252 208022 : result = Some(value);
253 208022 : }
254 210032 : false
255 410056 : },
256 410056 : ctx,
257 410056 : )
258 401 : .await?;
259 410056 : Ok(result)
260 410056 : }
261 :
262 : /// Return a stream which yields all key, value pairs from the index
263 : /// starting from the first key greater or equal to `start_key`.
264 : ///
265 : /// Note that this is a copy of [`Self::visit`].
266 : /// TODO: Once the sequential read path is removed this will become
267 : /// the only index traversal method.
268 64566 : pub fn get_stream_from<'a>(
269 64566 : &'a self,
270 64566 : start_key: &'a [u8; L],
271 64566 : ctx: &'a RequestContext,
272 64566 : ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
273 : try_stream! {
274 : let mut stack = Vec::new();
275 : stack.push((self.root_blk, None));
276 : let block_cursor = self.reader.block_cursor();
277 : while let Some((node_blknum, opt_iter)) = stack.pop() {
278 : // Locate the node.
279 : let node_buf = block_cursor
280 : .read_blk(self.start_blk + node_blknum, ctx)
281 : .await?;
282 :
283 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
284 : let prefix_len = node.prefix_len as usize;
285 : let suffix_len = node.suffix_len as usize;
286 :
287 : assert!(node.num_children > 0);
288 :
289 : let mut keybuf = Vec::new();
290 : keybuf.extend(node.prefix);
291 : keybuf.resize(prefix_len + suffix_len, 0);
292 :
293 : let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
294 : iter
295 : } else {
296 : // Locate the first match
297 : let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
298 : Ok(idx) => idx,
299 : Err(idx) => {
300 : if node.level == 0 {
301 : // Imagine that the node contains the following keys:
302 : //
303 : // 1
304 : // 3 <-- idx
305 : // 5
306 : //
307 : // If the search key is '2' and there is exact match,
308 : // the binary search would return the index of key
309 : // '3'. That's cool, '3' is the first key to return.
310 : idx
311 : } else {
312 : // This is an internal page, so each key represents a lower
313 : // bound for what's in the child page. If there is no exact
314 : // match, we have to return the *previous* entry.
315 : //
316 : // 1 <-- return this
317 : // 3 <-- idx
318 : // 5
319 : idx.saturating_sub(1)
320 : }
321 : }
322 : };
323 : Either::Left(idx..node.num_children.into())
324 : };
325 :
326 : // idx points to the first match now. Keep going from there
327 : while let Some(idx) = iter.next() {
328 : let key_off = idx * suffix_len;
329 : let suffix = &node.keys[key_off..key_off + suffix_len];
330 : keybuf[prefix_len..].copy_from_slice(suffix);
331 : let value = node.value(idx);
332 : #[allow(clippy::collapsible_if)]
333 : if node.level == 0 {
334 : // leaf
335 : yield (keybuf.clone(), value.to_u64());
336 : } else {
337 : stack.push((node_blknum, Some(iter)));
338 : stack.push((value.to_blknum(), None));
339 : break;
340 : }
341 : }
342 : }
343 : }
344 64566 : }
345 :
346 : ///
347 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
348 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
349 : /// backwards)
350 : ///
351 621878 : pub async fn visit<V>(
352 621878 : &self,
353 621878 : search_key: &[u8; L],
354 621878 : dir: VisitDirection,
355 621878 : mut visitor: V,
356 621878 : ctx: &RequestContext,
357 621878 : ) -> Result<bool>
358 621878 : where
359 621878 : V: FnMut(&[u8], u64) -> bool,
360 621878 : {
361 621878 : let mut stack = Vec::new();
362 621878 : stack.push((self.root_blk, None));
363 621878 : let block_cursor = self.reader.block_cursor();
364 1626526 : while let Some((node_blknum, opt_iter)) = stack.pop() {
365 : // Locate the node.
366 1426072 : let node_buf = block_cursor
367 1426072 : .read_blk(self.start_blk + node_blknum, ctx)
368 23050 : .await?;
369 :
370 1426072 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
371 1426072 : let prefix_len = node.prefix_len as usize;
372 1426072 : let suffix_len = node.suffix_len as usize;
373 1426072 :
374 1426072 : assert!(node.num_children > 0);
375 :
376 1426072 : let mut keybuf = Vec::new();
377 1426072 : keybuf.extend(node.prefix);
378 1426072 : keybuf.resize(prefix_len + suffix_len, 0);
379 :
380 1426072 : let mut iter = if let Some(iter) = opt_iter {
381 203898 : iter
382 1222174 : } else if dir == VisitDirection::Forwards {
383 : // Locate the first match
384 817381 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
385 210230 : Ok(idx) => idx,
386 607151 : Err(idx) => {
387 607151 : if node.level == 0 {
388 : // Imagine that the node contains the following keys:
389 : //
390 : // 1
391 : // 3 <-- idx
392 : // 5
393 : //
394 : // If the search key is '2' and there is exact match,
395 : // the binary search would return the index of key
396 : // '3'. That's cool, '3' is the first key to return.
397 208040 : idx
398 : } else {
399 : // This is an internal page, so each key represents a lower
400 : // bound for what's in the child page. If there is no exact
401 : // match, we have to return the *previous* entry.
402 : //
403 : // 1 <-- return this
404 : // 3 <-- idx
405 : // 5
406 399111 : idx.saturating_sub(1)
407 : }
408 : }
409 : };
410 817381 : Either::Left(idx..node.num_children.into())
411 : } else {
412 404793 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
413 2223 : Ok(idx) => {
414 2223 : // Exact match. That's the first entry to return, and walk
415 2223 : // backwards from there.
416 2223 : idx
417 : }
418 402570 : Err(idx) => {
419 : // No exact match. The binary search returned the index of the
420 : // first key that's > search_key. Back off by one, and walk
421 : // backwards from there.
422 402570 : if let Some(idx) = idx.checked_sub(1) {
423 396645 : idx
424 : } else {
425 5925 : return Ok(false);
426 : }
427 : }
428 : };
429 398868 : Either::Right((0..=idx).rev())
430 : };
431 :
432 : // idx points to the first match now. Keep going from there
433 3564101 : while let Some(idx) = iter.next() {
434 3159749 : let key_off = idx * suffix_len;
435 3159749 : let suffix = &node.keys[key_off..key_off + suffix_len];
436 3159749 : keybuf[prefix_len..].copy_from_slice(suffix);
437 3159749 : let value = node.value(idx);
438 3159749 : #[allow(clippy::collapsible_if)]
439 3159749 : if node.level == 0 {
440 : // leaf
441 2559453 : if !visitor(&keybuf, value.to_u64()) {
442 415499 : return Ok(false);
443 2143954 : }
444 : } else {
445 600296 : stack.push((node_blknum, Some(iter)));
446 600296 : stack.push((value.to_blknum(), None));
447 600296 : break;
448 : }
449 : }
450 : }
451 200454 : Ok(true)
452 621878 : }
453 :
454 : #[allow(dead_code)]
455 10 : pub async fn dump(&self) -> Result<()> {
456 10 : let mut stack = Vec::new();
457 10 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
458 10 :
459 10 : stack.push((self.root_blk, String::new(), 0, 0, 0));
460 10 :
461 10 : let block_cursor = self.reader.block_cursor();
462 :
463 6042 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
464 6032 : let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
465 6032 : let buf: &[u8] = blk.as_ref();
466 6032 : let node = OnDiskNode::<L>::deparse(buf)?;
467 :
468 6032 : if child_idx == 0 {
469 18 : print!("{:indent$}", "", indent = depth * 2);
470 18 : let path_prefix = stack
471 18 : .iter()
472 18 : .map(|(_blknum, path, ..)| path.as_str())
473 18 : .collect::<String>();
474 18 : println!(
475 18 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
476 18 : hex::encode(node.prefix),
477 18 : node.suffix_len
478 18 : );
479 6014 : }
480 :
481 6032 : if child_idx + 1 < node.num_children {
482 6014 : let key_off = key_off + node.suffix_len as usize;
483 6014 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
484 6014 : }
485 6032 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
486 6032 : let val = node.value(child_idx as usize);
487 6032 :
488 6032 : print!("{:indent$}", "", indent = depth * 2 + 2);
489 6032 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
490 6032 :
491 6032 : if node.level > 0 {
492 8 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
493 6024 : }
494 : }
495 10 : Ok(())
496 10 : }
497 : }
498 :
499 : ///
500 : /// Public builder object, for creating a new tree.
501 : ///
502 : /// Usage: Create a builder object by calling 'new', load all the data into the
503 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
504 : ///
505 : /// 'L' is the key length in bytes
506 : pub struct DiskBtreeBuilder<W, const L: usize>
507 : where
508 : W: BlockWriter,
509 : {
510 : writer: W,
511 :
512 : ///
513 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
514 : ///
515 : /// We maintain the length of the stack to be always greater than zero.
516 : /// Two exceptions are:
517 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
518 : /// So because other methods cannot see the intermediate state invariant still holds.
519 : /// 2. `Self::finish`. It consumes self and does not return it back,
520 : /// which means that this is where the structure is destroyed.
521 : /// Thus stack of zero length cannot be observed by other methods.
522 : stack: Vec<BuildNode<L>>,
523 :
524 : /// Last key that was appended to the tree. Used to sanity check that append
525 : /// is called in increasing key order.
526 : last_key: Option<[u8; L]>,
527 : }
528 :
529 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
530 : where
531 : W: BlockWriter,
532 : {
533 1656 : pub fn new(writer: W) -> Self {
534 1656 : DiskBtreeBuilder {
535 1656 : writer,
536 1656 : last_key: None,
537 1656 : stack: vec![BuildNode::new(0)],
538 1656 : }
539 1656 : }
540 :
541 6662865 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
542 6662865 : if value > MAX_VALUE {
543 0 : return Err(DiskBtreeError::AppendOverflow(value));
544 6662865 : }
545 6662865 : if let Some(last_key) = &self.last_key {
546 6661559 : if key <= last_key {
547 2 : return Err(DiskBtreeError::UnsortedInput {
548 2 : key: key.as_slice().into(),
549 2 : last_key: last_key.as_slice().into(),
550 2 : });
551 6661557 : }
552 1306 : }
553 6662863 : self.last_key = Some(*key);
554 6662863 :
555 6662863 : self.append_internal(key, Value::from_u64(value))
556 6662865 : }
557 :
558 6675284 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
559 6675284 : // Try to append to the current leaf buffer
560 6675284 : let last = self
561 6675284 : .stack
562 6675284 : .last_mut()
563 6675284 : .expect("should always have at least one item");
564 6675284 : let level = last.level;
565 6675284 : if last.push(key, value) {
566 6651674 : return Ok(());
567 23610 : }
568 23610 :
569 23610 : // It did not fit. Try to compress, and if it succeeds to make
570 23610 : // some room on the node, try appending to it again.
571 23610 : #[allow(clippy::collapsible_if)]
572 23610 : if last.compress() {
573 11981 : if last.push(key, value) {
574 11969 : return Ok(());
575 12 : }
576 11629 : }
577 :
578 : // Could not append to the current leaf. Flush it and create a new one.
579 11641 : self.flush_node()?;
580 :
581 : // Replace the node we flushed with an empty one and append the new
582 : // key to it.
583 11641 : let mut last = BuildNode::new(level);
584 11641 : if !last.push(key, value) {
585 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
586 11641 : }
587 11641 :
588 11641 : self.stack.push(last);
589 11641 :
590 11641 : Ok(())
591 6675284 : }
592 :
593 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
594 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
595 : /// creates a new root page, increasing the height of the tree.
596 12421 : fn flush_node(&mut self) -> Result<()> {
597 12421 : // Get the current bottommost node in the stack and flush it to disk.
598 12421 : let last = self
599 12421 : .stack
600 12421 : .pop()
601 12421 : .expect("should always have at least one item");
602 12421 : let buf = last.pack();
603 12421 : let downlink_key = last.first_key();
604 12421 : let downlink_ptr = self.writer.write_blk(buf)?;
605 :
606 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
607 : // create a new root page, increasing the height of the tree.
608 12421 : if self.stack.is_empty() {
609 780 : self.stack.push(BuildNode::new(last.level + 1));
610 11641 : }
611 12421 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
612 12421 : }
613 :
614 : ///
615 : /// Flushes everything to disk, and returns the block number of the root page.
616 : /// The caller must store the root block number "out-of-band", and pass it
617 : /// to the DiskBtreeReader::new() when you want to read the tree again.
618 : /// (In the image and delta layers, it is stored in the beginning of the file,
619 : /// in the summary header)
620 : ///
621 1304 : pub fn finish(mut self) -> Result<(u32, W)> {
622 : // flush all levels, except the root.
623 2084 : while self.stack.len() > 1 {
624 780 : self.flush_node()?;
625 : }
626 :
627 1304 : let root = self
628 1304 : .stack
629 1304 : .first()
630 1304 : .expect("by the check above we left one item there");
631 1304 : let buf = root.pack();
632 1304 : let root_blknum = self.writer.write_blk(buf)?;
633 :
634 1304 : Ok((root_blknum, self.writer))
635 1304 : }
636 :
637 2023972 : pub fn borrow_writer(&self) -> &W {
638 2023972 : &self.writer
639 2023972 : }
640 : }
641 :
642 : ///
643 : /// BuildNode represesnts an incomplete page that we are appending to.
644 : ///
645 : #[derive(Clone, Debug)]
646 : struct BuildNode<const L: usize> {
647 : num_children: u16,
648 : level: u8,
649 : prefix: Vec<u8>,
650 : suffix_len: usize,
651 :
652 : keys: Vec<u8>,
653 : values: Vec<u8>,
654 :
655 : size: usize, // physical size of this node, if it was written to disk like this
656 : }
657 :
658 : const NODE_SIZE: usize = PAGE_SZ;
659 :
660 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
661 :
662 : impl<const L: usize> BuildNode<L> {
663 14077 : fn new(level: u8) -> Self {
664 14077 : BuildNode {
665 14077 : num_children: 0,
666 14077 : level,
667 14077 : prefix: Vec::new(),
668 14077 : suffix_len: 0,
669 14077 : keys: Vec::new(),
670 14077 : values: Vec::new(),
671 14077 : size: NODE_HDR_SIZE,
672 14077 : }
673 14077 : }
674 :
675 : /// Try to append a key-value pair to this node. Returns 'true' on
676 : /// success, 'false' if the page was full or the key was
677 : /// incompatible with the prefix of the existing keys.
678 6698906 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
679 6698906 : // If we have already performed prefix-compression on the page,
680 6698906 : // check that the incoming key has the same prefix.
681 6698906 : if self.num_children > 0 {
682 : // does the prefix allow it?
683 6685179 : if !key.starts_with(&self.prefix) {
684 244 : return false;
685 6684935 : }
686 13727 : } else {
687 13727 : self.suffix_len = key.len();
688 13727 : }
689 :
690 : // Is the node too full?
691 6698662 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
692 23378 : return false;
693 6675284 : }
694 6675284 :
695 6675284 : // All clear
696 6675284 : self.num_children += 1;
697 6675284 : self.keys.extend(&key[self.prefix.len()..]);
698 6675284 : self.values.extend(value.0);
699 6675284 :
700 6675284 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
701 6675284 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
702 :
703 6675284 : self.size += self.suffix_len + VALUE_SZ;
704 6675284 :
705 6675284 : true
706 6698906 : }
707 :
708 : ///
709 : /// Perform prefix-compression.
710 : ///
711 : /// Returns 'true' on success, 'false' if no compression was possible.
712 : ///
713 23610 : fn compress(&mut self) -> bool {
714 23610 : let first_suffix = self.first_suffix();
715 23610 : let last_suffix = self.last_suffix();
716 23610 :
717 23610 : // Find the common prefix among all keys
718 23610 : let mut prefix_len = 0;
719 214658 : while prefix_len < self.suffix_len {
720 214658 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
721 23610 : break;
722 191048 : }
723 191048 : prefix_len += 1;
724 : }
725 23610 : if prefix_len == 0 {
726 11629 : return false;
727 11981 : }
728 11981 :
729 11981 : // Can compress. Rewrite the keys without the common prefix.
730 11981 : self.prefix.extend(&self.keys[..prefix_len]);
731 11981 :
732 11981 : let mut new_keys = Vec::new();
733 11981 : let mut key_off = 0;
734 3201355 : while key_off < self.keys.len() {
735 3189374 : let next_key_off = key_off + self.suffix_len;
736 3189374 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
737 3189374 : key_off = next_key_off;
738 3189374 : }
739 11981 : self.keys = new_keys;
740 11981 : self.suffix_len -= prefix_len;
741 11981 :
742 11981 : self.size -= prefix_len * self.num_children as usize;
743 11981 : self.size += prefix_len;
744 11981 :
745 11981 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
746 11981 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
747 :
748 11981 : true
749 23610 : }
750 :
751 : ///
752 : /// Serialize the node to on-disk format.
753 : ///
754 13725 : fn pack(&self) -> Bytes {
755 13725 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
756 13725 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
757 13725 : assert!(self.num_children > 0);
758 :
759 13725 : let mut buf = BytesMut::new();
760 13725 :
761 13725 : buf.put_u16(self.num_children);
762 13725 : buf.put_u8(self.level);
763 13725 : buf.put_u8(self.prefix.len() as u8);
764 13725 : buf.put_u8(self.suffix_len as u8);
765 13725 : buf.put(&self.prefix[..]);
766 13725 : buf.put(&self.keys[..]);
767 13725 : buf.put(&self.values[..]);
768 13725 :
769 13725 : assert!(buf.len() == self.size);
770 :
771 13725 : assert!(buf.len() <= PAGE_SZ);
772 13725 : buf.resize(PAGE_SZ, 0);
773 13725 : buf.freeze()
774 13725 : }
775 :
776 36031 : fn first_suffix(&self) -> &[u8] {
777 36031 : &self.keys[..self.suffix_len]
778 36031 : }
779 23610 : fn last_suffix(&self) -> &[u8] {
780 23610 : &self.keys[self.keys.len() - self.suffix_len..]
781 23610 : }
782 :
783 : /// Return the full first key of the page, including the prefix
784 12421 : fn first_key(&self) -> [u8; L] {
785 12421 : let mut key = [0u8; L];
786 12421 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
787 12421 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
788 12421 : key
789 12421 : }
790 : }
791 :
792 : #[cfg(test)]
793 : pub(crate) mod tests {
794 : use super::*;
795 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
796 : use rand::Rng;
797 : use std::collections::BTreeMap;
798 : use std::sync::atomic::{AtomicUsize, Ordering};
799 :
800 : #[derive(Clone, Default)]
801 : pub(crate) struct TestDisk {
802 : blocks: Vec<Bytes>,
803 : }
804 : impl TestDisk {
805 10 : fn new() -> Self {
806 10 : Self::default()
807 10 : }
808 1016292 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
809 1016292 : let mut buf = [0u8; PAGE_SZ];
810 1016292 : buf.copy_from_slice(&self.blocks[blknum as usize]);
811 1016292 : Ok(std::sync::Arc::new(buf).into())
812 1016292 : }
813 : }
814 : impl BlockReader for TestDisk {
815 411181 : fn block_cursor(&self) -> BlockCursor<'_> {
816 411181 : BlockCursor::new(BlockReaderRef::TestDisk(self))
817 411181 : }
818 : }
819 : impl BlockWriter for &mut TestDisk {
820 216 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
821 216 : let blknum = self.blocks.len();
822 216 : self.blocks.push(buf);
823 216 : Ok(blknum as u32)
824 216 : }
825 : }
826 :
827 : #[tokio::test]
828 2 : async fn basic() -> Result<()> {
829 2 : let mut disk = TestDisk::new();
830 2 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
831 2 :
832 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
833 2 :
834 2 : let all_keys: Vec<&[u8; 6]> = vec![
835 2 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
836 2 : ];
837 2 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
838 2 : .iter()
839 2 : .enumerate()
840 16 : .map(|(idx, key)| (*key, idx as u64))
841 2 : .collect();
842 16 : for (key, val) in all_data.iter() {
843 16 : writer.append(key, *val)?;
844 2 : }
845 2 :
846 2 : let (root_offset, _writer) = writer.finish()?;
847 2 :
848 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
849 2 :
850 2 : reader.dump().await?;
851 2 :
852 2 : // Test the `get` function on all the keys.
853 16 : for (key, val) in all_data.iter() {
854 16 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
855 2 : }
856 2 : // And on some keys that don't exist
857 2 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
858 2 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
859 2 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
860 2 :
861 2 : // Test search with `visit` function
862 2 : let search_key = b"xabaaa";
863 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
864 2 : .iter()
865 16 : .filter(|(key, _value)| key[..] >= search_key[..])
866 10 : .map(|(key, value)| (key.to_vec(), *value))
867 2 : .collect();
868 2 :
869 2 : let mut data = Vec::new();
870 2 : reader
871 2 : .visit(
872 2 : search_key,
873 2 : VisitDirection::Forwards,
874 10 : |key, value| {
875 10 : data.push((key.to_vec(), value));
876 10 : true
877 10 : },
878 2 : &ctx,
879 2 : )
880 2 : .await?;
881 2 : assert_eq!(data, expected);
882 2 :
883 2 : // Test a backwards scan
884 2 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
885 2 : .iter()
886 16 : .filter(|(key, _value)| key[..] <= search_key[..])
887 8 : .map(|(key, value)| (key.to_vec(), *value))
888 2 : .collect();
889 2 : expected.reverse();
890 2 : let mut data = Vec::new();
891 2 : reader
892 2 : .visit(
893 2 : search_key,
894 2 : VisitDirection::Backwards,
895 8 : |key, value| {
896 8 : data.push((key.to_vec(), value));
897 8 : true
898 8 : },
899 2 : &ctx,
900 2 : )
901 2 : .await?;
902 2 : assert_eq!(data, expected);
903 2 :
904 2 : // Backward scan where nothing matches
905 2 : reader
906 2 : .visit(
907 2 : b"aaaaaa",
908 2 : VisitDirection::Backwards,
909 2 : |key, value| {
910 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
911 2 : },
912 2 : &ctx,
913 2 : )
914 2 : .await?;
915 2 :
916 2 : // Full scan
917 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
918 2 : .iter()
919 16 : .map(|(key, value)| (key.to_vec(), *value))
920 2 : .collect();
921 2 : let mut data = Vec::new();
922 2 : reader
923 2 : .visit(
924 2 : &[0u8; 6],
925 2 : VisitDirection::Forwards,
926 16 : |key, value| {
927 16 : data.push((key.to_vec(), value));
928 16 : true
929 16 : },
930 2 : &ctx,
931 2 : )
932 2 : .await?;
933 2 : assert_eq!(data, expected);
934 2 :
935 2 : Ok(())
936 2 : }
937 :
938 : #[tokio::test]
939 2 : async fn lots_of_keys() -> Result<()> {
940 2 : let mut disk = TestDisk::new();
941 2 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
942 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
943 2 :
944 2 : const NUM_KEYS: u64 = 1000;
945 2 :
946 2 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
947 2 :
948 2002 : for idx in 0..NUM_KEYS {
949 2000 : let key_int: u64 = 1 + idx * 2;
950 2000 : let key = u64::to_be_bytes(key_int);
951 2000 : writer.append(&key, idx)?;
952 2 :
953 2000 : all_data.insert(key_int, idx);
954 2 : }
955 2 :
956 2 : let (root_offset, _writer) = writer.finish()?;
957 2 :
958 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
959 2 :
960 2 : reader.dump().await?;
961 2 :
962 2 : use std::sync::Mutex;
963 2 :
964 2 : let result = Mutex::new(Vec::new());
965 2 : let limit: AtomicUsize = AtomicUsize::new(10);
966 83820 : let take_ten = |key: &[u8], value: u64| {
967 83820 : let mut keybuf = [0u8; 8];
968 83820 : keybuf.copy_from_slice(key);
969 83820 : let key_int = u64::from_be_bytes(keybuf);
970 83820 :
971 83820 : let mut result = result.lock().unwrap();
972 83820 : result.push((key_int, value));
973 83820 :
974 83820 : // keep going until we have 10 matches
975 83820 : result.len() < limit.load(Ordering::Relaxed)
976 83820 : };
977 2 :
978 4020 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
979 4020 : let search_key = u64::to_be_bytes(search_key_int);
980 4020 : assert_eq!(
981 4020 : reader.get(&search_key, &ctx).await?,
982 4020 : all_data.get(&search_key_int).cloned()
983 2 : );
984 2 :
985 2 : // Test a forward scan starting with this key
986 4020 : result.lock().unwrap().clear();
987 4020 : reader
988 4020 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
989 2 : .await?;
990 4020 : let expected = all_data
991 4020 : .range(search_key_int..)
992 4020 : .take(10)
993 39820 : .map(|(&key, &val)| (key, val))
994 4020 : .collect::<Vec<(u64, u64)>>();
995 4020 : assert_eq!(*result.lock().unwrap(), expected);
996 2 :
997 2 : // And a backwards scan
998 4020 : result.lock().unwrap().clear();
999 4020 : reader
1000 4020 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1001 2 : .await?;
1002 4020 : let expected = all_data
1003 4020 : .range(..=search_key_int)
1004 4020 : .rev()
1005 4020 : .take(10)
1006 40000 : .map(|(&key, &val)| (key, val))
1007 4020 : .collect::<Vec<(u64, u64)>>();
1008 4020 : assert_eq!(*result.lock().unwrap(), expected);
1009 2 : }
1010 2 :
1011 2 : // full scan
1012 2 : let search_key = u64::to_be_bytes(0);
1013 2 : limit.store(usize::MAX, Ordering::Relaxed);
1014 2 : result.lock().unwrap().clear();
1015 2 : reader
1016 2 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1017 2 : .await?;
1018 2 : let expected = all_data
1019 2 : .iter()
1020 2000 : .map(|(&key, &val)| (key, val))
1021 2 : .collect::<Vec<(u64, u64)>>();
1022 2 : assert_eq!(*result.lock().unwrap(), expected);
1023 2 :
1024 2 : // full scan
1025 2 : let search_key = u64::to_be_bytes(u64::MAX);
1026 2 : limit.store(usize::MAX, Ordering::Relaxed);
1027 2 : result.lock().unwrap().clear();
1028 2 : reader
1029 2 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1030 2 : .await?;
1031 2 : let expected = all_data
1032 2 : .iter()
1033 2 : .rev()
1034 2000 : .map(|(&key, &val)| (key, val))
1035 2 : .collect::<Vec<(u64, u64)>>();
1036 2 : assert_eq!(*result.lock().unwrap(), expected);
1037 2 :
1038 2 : Ok(())
1039 2 : }
1040 :
1041 : #[tokio::test]
1042 2 : async fn random_data() -> Result<()> {
1043 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1044 2 :
1045 2 : // Generate random keys with exponential distribution, to
1046 2 : // exercise the prefix compression
1047 2 : const NUM_KEYS: usize = 100000;
1048 2 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
1049 200002 : for idx in 0..NUM_KEYS {
1050 200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
1051 200000 : let t = -(f64::ln(u));
1052 200000 : let key_int = (t * 1000000.0) as u128;
1053 200000 :
1054 200000 : all_data.insert(key_int, idx as u64);
1055 200000 : }
1056 2 :
1057 2 : // Build a tree from it
1058 2 : let mut disk = TestDisk::new();
1059 2 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
1060 2 :
1061 195071 : for (&key, &val) in all_data.iter() {
1062 195071 : writer.append(&u128::to_be_bytes(key), val)?;
1063 2 : }
1064 2 : let (root_offset, _writer) = writer.finish()?;
1065 2 :
1066 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1067 2 :
1068 2 : // Test get() operation on all the keys
1069 195071 : for (&key, &val) in all_data.iter() {
1070 195071 : let search_key = u128::to_be_bytes(key);
1071 195071 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
1072 2 : }
1073 2 :
1074 2 : // Test get() operations on random keys, most of which will not exist
1075 200002 : for _ in 0..100000 {
1076 200000 : let key_int = rand::thread_rng().gen::<u128>();
1077 200000 : let search_key = u128::to_be_bytes(key_int);
1078 200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
1079 2 : }
1080 2 :
1081 2 : // Test boundary cases
1082 2 : assert!(
1083 2 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
1084 2 : == all_data.get(&u128::MIN).cloned()
1085 2 : );
1086 2 : assert!(
1087 2 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
1088 2 : == all_data.get(&u128::MAX).cloned()
1089 2 : );
1090 2 :
1091 2 : Ok(())
1092 2 : }
1093 :
1094 : #[test]
1095 2 : fn unsorted_input() {
1096 2 : let mut disk = TestDisk::new();
1097 2 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1098 2 :
1099 2 : let _ = writer.append(b"ba", 1);
1100 2 : let _ = writer.append(b"bb", 2);
1101 2 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1102 2 : match err {
1103 2 : DiskBtreeError::UnsortedInput { key, last_key } => {
1104 2 : assert_eq!(key.as_ref(), b"aa".as_slice());
1105 2 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1106 : }
1107 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1108 : }
1109 2 : }
1110 :
1111 : ///
1112 : /// This test contains a particular data set, see disk_btree_test_data.rs
1113 : ///
1114 : #[tokio::test]
1115 2 : async fn particular_data() -> Result<()> {
1116 2 : // Build a tree from it
1117 2 : let mut disk = TestDisk::new();
1118 2 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1119 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1120 2 :
1121 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1122 4000 : writer.append(&key, val)?;
1123 2 : }
1124 2 : let (root_offset, writer) = writer.finish()?;
1125 2 :
1126 2 : println!("SIZE: {} blocks", writer.blocks.len());
1127 2 :
1128 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1129 2 :
1130 2 : // Test get() operation on all the keys
1131 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1132 4000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1133 2 : }
1134 2 :
1135 2 : // Test full scan
1136 2 : let mut count = 0;
1137 2 : reader
1138 2 : .visit(
1139 2 : &[0u8; 26],
1140 2 : VisitDirection::Forwards,
1141 4000 : |_key, _value| {
1142 4000 : count += 1;
1143 4000 : true
1144 4000 : },
1145 2 : &ctx,
1146 2 : )
1147 2 : .await?;
1148 2 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1149 2 :
1150 2 : reader.dump().await?;
1151 2 :
1152 2 : Ok(())
1153 2 : }
1154 : }
1155 :
1156 : #[cfg(test)]
1157 : #[path = "disk_btree_test_data.rs"]
1158 : mod disk_btree_test_data;
|