Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use async_stream::try_stream;
22 : use byteorder::{ReadBytesExt, BE};
23 : use bytes::{BufMut, Bytes, BytesMut};
24 : use either::Either;
25 : use futures::{Stream, StreamExt};
26 : use hex;
27 : use std::{
28 : cmp::Ordering,
29 : io,
30 : iter::Rev,
31 : ops::{Range, RangeInclusive},
32 : result,
33 : };
34 : use thiserror::Error;
35 : use tracing::error;
36 :
37 : use crate::{
38 : context::{DownloadBehavior, RequestContext},
39 : task_mgr::TaskKind,
40 : tenant::block_io::{BlockReader, BlockWriter},
41 : };
42 :
43 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
44 : pub const VALUE_SZ: usize = 5;
45 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
46 :
47 : pub const PAGE_SZ: usize = 8192;
48 :
49 : #[derive(Clone, Copy, Debug)]
50 : struct Value([u8; VALUE_SZ]);
51 :
52 : impl Value {
53 4677023 : fn from_slice(slice: &[u8]) -> Value {
54 4677023 : let mut b = [0u8; VALUE_SZ];
55 4677023 : b.copy_from_slice(slice);
56 4677023 : Value(b)
57 4677023 : }
58 :
59 7187574 : fn from_u64(x: u64) -> Value {
60 7187574 : assert!(x <= 0x007f_ffff_ffff);
61 7187574 : Value([
62 7187574 : (x >> 32) as u8,
63 7187574 : (x >> 24) as u8,
64 7187574 : (x >> 16) as u8,
65 7187574 : (x >> 8) as u8,
66 7187574 : x as u8,
67 7187574 : ])
68 7187574 : }
69 :
70 12861 : fn from_blknum(x: u32) -> Value {
71 12861 : Value([
72 12861 : 0x80,
73 12861 : (x >> 24) as u8,
74 12861 : (x >> 16) as u8,
75 12861 : (x >> 8) as u8,
76 12861 : x as u8,
77 12861 : ])
78 12861 : }
79 :
80 : #[allow(dead_code)]
81 0 : fn is_offset(self) -> bool {
82 0 : self.0[0] & 0x80 != 0
83 0 : }
84 :
85 4026269 : fn to_u64(self) -> u64 {
86 4026269 : let b = &self.0;
87 4026269 : (b[0] as u64) << 32
88 4026269 : | (b[1] as u64) << 24
89 4026269 : | (b[2] as u64) << 16
90 4026269 : | (b[3] as u64) << 8
91 4026269 : | b[4] as u64
92 4026269 : }
93 :
94 644730 : fn to_blknum(self) -> u32 {
95 644730 : let b = &self.0;
96 644730 : assert!(b[0] == 0x80);
97 644730 : (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
98 644730 : }
99 : }
100 :
101 0 : #[derive(Error, Debug)]
102 : pub enum DiskBtreeError {
103 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
104 : AppendOverflow(u64),
105 :
106 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
107 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
108 :
109 : #[error("Could not push to new leaf node")]
110 : FailedToPushToNewLeafNode,
111 :
112 : #[error("IoError: {0}")]
113 : Io(#[from] io::Error),
114 : }
115 :
116 : pub type Result<T> = result::Result<T, DiskBtreeError>;
117 :
118 : /// This is the on-disk representation.
119 : struct OnDiskNode<'a, const L: usize> {
120 : // Fixed-width fields
121 : num_children: u16,
122 : level: u8,
123 : prefix_len: u8,
124 : suffix_len: u8,
125 :
126 : // Variable-length fields. These are stored on-disk after the fixed-width
127 : // fields, in this order. In the in-memory representation, these point to
128 : // the right parts in the page buffer.
129 : prefix: &'a [u8],
130 : keys: &'a [u8],
131 : values: &'a [u8],
132 : }
133 :
134 : impl<'a, const L: usize> OnDiskNode<'a, L> {
135 : ///
136 : /// Interpret a PAGE_SZ page as a node.
137 : ///
138 1543863 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
139 1543863 : let mut cursor = std::io::Cursor::new(buf);
140 1543863 : let num_children = cursor.read_u16::<BE>()?;
141 1543863 : let level = cursor.read_u8()?;
142 1543863 : let prefix_len = cursor.read_u8()?;
143 1543863 : let suffix_len = cursor.read_u8()?;
144 :
145 1543863 : let mut off = cursor.position();
146 1543863 : let prefix_off = off as usize;
147 1543863 : off += prefix_len as u64;
148 1543863 :
149 1543863 : let keys_off = off as usize;
150 1543863 : let keys_len = num_children as usize * suffix_len as usize;
151 1543863 : off += keys_len as u64;
152 1543863 :
153 1543863 : let values_off = off as usize;
154 1543863 : let values_len = num_children as usize * VALUE_SZ;
155 1543863 : //off += values_len as u64;
156 1543863 :
157 1543863 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
158 1543863 : let keys = &buf[keys_off..keys_off + keys_len];
159 1543863 : let values = &buf[values_off..values_off + values_len];
160 1543863 :
161 1543863 : Ok(OnDiskNode {
162 1543863 : num_children,
163 1543863 : level,
164 1543863 : prefix_len,
165 1543863 : suffix_len,
166 1543863 : prefix,
167 1543863 : keys,
168 1543863 : values,
169 1543863 : })
170 1543863 : }
171 :
172 : ///
173 : /// Read a value at 'idx'
174 : ///
175 4677023 : fn value(&self, idx: usize) -> Value {
176 4677023 : let value_off = idx * VALUE_SZ;
177 4677023 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
178 4677023 : Value::from_slice(value_slice)
179 4677023 : }
180 :
181 1332544 : fn binary_search(
182 1332544 : &self,
183 1332544 : search_key: &[u8; L],
184 1332544 : keybuf: &mut [u8],
185 1332544 : ) -> result::Result<usize, usize> {
186 1332544 : let mut size = self.num_children as usize;
187 1332544 : let mut low = 0;
188 1332544 : let mut high = size;
189 10144491 : while low < high {
190 9024764 : let mid = low + size / 2;
191 9024764 :
192 9024764 : let key_off = mid * self.suffix_len as usize;
193 9024764 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
194 9024764 : // Does this match?
195 9024764 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
196 9024764 :
197 9024764 : let cmp = keybuf[..].cmp(search_key);
198 9024764 :
199 9024764 : if cmp == Ordering::Less {
200 5802172 : low = mid + 1;
201 5802172 : } else if cmp == Ordering::Greater {
202 3009775 : high = mid;
203 3009775 : } else {
204 212817 : return Ok(mid);
205 : }
206 8811947 : size = high - low;
207 : }
208 1119727 : Err(low)
209 1332544 : }
210 : }
211 :
212 : ///
213 : /// Public reader object, to search the tree.
214 : ///
215 : pub struct DiskBtreeReader<R, const L: usize>
216 : where
217 : R: BlockReader,
218 : {
219 : start_blk: u32,
220 : root_blk: u32,
221 : reader: R,
222 : }
223 :
224 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
225 : pub enum VisitDirection {
226 : Forwards,
227 : Backwards,
228 : }
229 :
230 : impl<R, const L: usize> DiskBtreeReader<R, L>
231 : where
232 : R: BlockReader,
233 : {
234 212794 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
235 212794 : DiskBtreeReader {
236 212794 : start_blk,
237 212794 : root_blk,
238 212794 : reader,
239 212794 : }
240 212794 : }
241 :
242 : ///
243 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
244 : ///
245 410203 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
246 410203 : let mut result: Option<u64> = None;
247 410203 : self.visit(
248 410203 : search_key,
249 410203 : VisitDirection::Forwards,
250 410203 : |key, value| {
251 210179 : if key == search_key {
252 208169 : result = Some(value);
253 208169 : }
254 210179 : false
255 410203 : },
256 410203 : ctx,
257 410203 : )
258 417 : .await?;
259 410203 : Ok(result)
260 410203 : }
261 :
262 2 : pub fn iter<'a>(
263 2 : &'a self,
264 2 : start_key: &'a [u8; L],
265 2 : ctx: &'a RequestContext,
266 2 : ) -> DiskBtreeIterator<'a> {
267 2 : DiskBtreeIterator {
268 2 : stream: Box::pin(self.get_stream_from(start_key, ctx)),
269 2 : }
270 2 : }
271 :
272 : /// Return a stream which yields all key, value pairs from the index
273 : /// starting from the first key greater or equal to `start_key`.
274 : ///
275 : /// Note that this is a copy of [`Self::visit`].
276 : /// TODO: Once the sequential read path is removed this will become
277 : /// the only index traversal method.
278 64266 : pub fn get_stream_from<'a>(
279 64266 : &'a self,
280 64266 : start_key: &'a [u8; L],
281 64266 : ctx: &'a RequestContext,
282 64266 : ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
283 : try_stream! {
284 : let mut stack = Vec::new();
285 : stack.push((self.root_blk, None));
286 : let block_cursor = self.reader.block_cursor();
287 : while let Some((node_blknum, opt_iter)) = stack.pop() {
288 : // Locate the node.
289 : let node_buf = block_cursor
290 : .read_blk(self.start_blk + node_blknum, ctx)
291 : .await?;
292 :
293 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
294 : let prefix_len = node.prefix_len as usize;
295 : let suffix_len = node.suffix_len as usize;
296 :
297 : assert!(node.num_children > 0);
298 :
299 : let mut keybuf = Vec::new();
300 : keybuf.extend(node.prefix);
301 : keybuf.resize(prefix_len + suffix_len, 0);
302 :
303 : let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
304 : iter
305 : } else {
306 : // Locate the first match
307 : let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
308 : Ok(idx) => idx,
309 : Err(idx) => {
310 : if node.level == 0 {
311 : // Imagine that the node contains the following keys:
312 : //
313 : // 1
314 : // 3 <-- idx
315 : // 5
316 : //
317 : // If the search key is '2' and there is exact match,
318 : // the binary search would return the index of key
319 : // '3'. That's cool, '3' is the first key to return.
320 : idx
321 : } else {
322 : // This is an internal page, so each key represents a lower
323 : // bound for what's in the child page. If there is no exact
324 : // match, we have to return the *previous* entry.
325 : //
326 : // 1 <-- return this
327 : // 3 <-- idx
328 : // 5
329 : idx.saturating_sub(1)
330 : }
331 : }
332 : };
333 : Either::Left(idx..node.num_children.into())
334 : };
335 :
336 : // idx points to the first match now. Keep going from there
337 : while let Some(idx) = iter.next() {
338 : let key_off = idx * suffix_len;
339 : let suffix = &node.keys[key_off..key_off + suffix_len];
340 : keybuf[prefix_len..].copy_from_slice(suffix);
341 : let value = node.value(idx);
342 : #[allow(clippy::collapsible_if)]
343 : if node.level == 0 {
344 : // leaf
345 : yield (keybuf.clone(), value.to_u64());
346 : } else {
347 : stack.push((node_blknum, Some(iter)));
348 : stack.push((value.to_blknum(), None));
349 : break;
350 : }
351 : }
352 : }
353 : }
354 64266 : }
355 :
356 : ///
357 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
358 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
359 : /// backwards)
360 : ///
361 623556 : pub async fn visit<V>(
362 623556 : &self,
363 623556 : search_key: &[u8; L],
364 623556 : dir: VisitDirection,
365 623556 : mut visitor: V,
366 623556 : ctx: &RequestContext,
367 623556 : ) -> Result<bool>
368 623556 : where
369 623556 : V: FnMut(&[u8], u64) -> bool,
370 623556 : {
371 623556 : let mut stack = Vec::new();
372 623556 : stack.push((self.root_blk, None));
373 623556 : let block_cursor = self.reader.block_cursor();
374 1629362 : while let Some((node_blknum, opt_iter)) = stack.pop() {
375 : // Locate the node.
376 1428826 : let node_buf = block_cursor
377 1428826 : .read_blk(self.start_blk + node_blknum, ctx)
378 23333 : .await?;
379 :
380 1428826 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
381 1428826 : let prefix_len = node.prefix_len as usize;
382 1428826 : let suffix_len = node.suffix_len as usize;
383 1428826 :
384 1428826 : assert!(node.num_children > 0);
385 :
386 1428826 : let mut keybuf = Vec::new();
387 1428826 : keybuf.extend(node.prefix);
388 1428826 : keybuf.resize(prefix_len + suffix_len, 0);
389 :
390 1428826 : let mut iter = if let Some(iter) = opt_iter {
391 203898 : iter
392 1224928 : } else if dir == VisitDirection::Forwards {
393 : // Locate the first match
394 817687 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
395 210378 : Ok(idx) => idx,
396 607309 : Err(idx) => {
397 607309 : if node.level == 0 {
398 : // Imagine that the node contains the following keys:
399 : //
400 : // 1
401 : // 3 <-- idx
402 : // 5
403 : //
404 : // If the search key is '2' and there is exact match,
405 : // the binary search would return the index of key
406 : // '3'. That's cool, '3' is the first key to return.
407 208120 : idx
408 : } else {
409 : // This is an internal page, so each key represents a lower
410 : // bound for what's in the child page. If there is no exact
411 : // match, we have to return the *previous* entry.
412 : //
413 : // 1 <-- return this
414 : // 3 <-- idx
415 : // 5
416 399189 : idx.saturating_sub(1)
417 : }
418 : }
419 : };
420 817687 : Either::Left(idx..node.num_children.into())
421 : } else {
422 407241 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
423 2206 : Ok(idx) => {
424 2206 : // Exact match. That's the first entry to return, and walk
425 2206 : // backwards from there.
426 2206 : idx
427 : }
428 405035 : Err(idx) => {
429 : // No exact match. The binary search returned the index of the
430 : // first key that's > search_key. Back off by one, and walk
431 : // backwards from there.
432 405035 : if let Some(idx) = idx.checked_sub(1) {
433 398976 : idx
434 : } else {
435 6059 : return Ok(false);
436 : }
437 : }
438 : };
439 401182 : Either::Right((0..=idx).rev())
440 : };
441 :
442 : // idx points to the first match now. Keep going from there
443 3566729 : while let Some(idx) = iter.next() {
444 3162295 : let key_off = idx * suffix_len;
445 3162295 : let suffix = &node.keys[key_off..key_off + suffix_len];
446 3162295 : keybuf[prefix_len..].copy_from_slice(suffix);
447 3162295 : let value = node.value(idx);
448 3162295 : #[allow(clippy::collapsible_if)]
449 3162295 : if node.level == 0 {
450 : // leaf
451 2560923 : if !visitor(&keybuf, value.to_u64()) {
452 416961 : return Ok(false);
453 2143962 : }
454 : } else {
455 601372 : stack.push((node_blknum, Some(iter)));
456 601372 : stack.push((value.to_blknum(), None));
457 601372 : break;
458 : }
459 : }
460 : }
461 200536 : Ok(true)
462 623556 : }
463 :
464 : #[allow(dead_code)]
465 10 : pub async fn dump(&self) -> Result<()> {
466 10 : let mut stack = Vec::new();
467 10 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
468 10 :
469 10 : stack.push((self.root_blk, String::new(), 0, 0, 0));
470 10 :
471 10 : let block_cursor = self.reader.block_cursor();
472 :
473 6042 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
474 6032 : let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
475 6032 : let buf: &[u8] = blk.as_ref();
476 6032 : let node = OnDiskNode::<L>::deparse(buf)?;
477 :
478 6032 : if child_idx == 0 {
479 18 : print!("{:indent$}", "", indent = depth * 2);
480 18 : let path_prefix = stack
481 18 : .iter()
482 18 : .map(|(_blknum, path, ..)| path.as_str())
483 18 : .collect::<String>();
484 18 : println!(
485 18 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
486 18 : hex::encode(node.prefix),
487 18 : node.suffix_len
488 18 : );
489 6014 : }
490 :
491 6032 : if child_idx + 1 < node.num_children {
492 6014 : let key_off = key_off + node.suffix_len as usize;
493 6014 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
494 6014 : }
495 6032 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
496 6032 : let val = node.value(child_idx as usize);
497 6032 :
498 6032 : print!("{:indent$}", "", indent = depth * 2 + 2);
499 6032 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
500 6032 :
501 6032 : if node.level > 0 {
502 8 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
503 6024 : }
504 : }
505 10 : Ok(())
506 10 : }
507 : }
508 :
509 : pub struct DiskBtreeIterator<'a> {
510 : #[allow(clippy::type_complexity)]
511 : stream: std::pin::Pin<
512 : Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a>,
513 : >,
514 : }
515 :
516 : impl<'a> DiskBtreeIterator<'a> {
517 195152 : pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
518 195152 : self.stream.next().await
519 195152 : }
520 : }
521 :
522 : ///
523 : /// Public builder object, for creating a new tree.
524 : ///
525 : /// Usage: Create a builder object by calling 'new', load all the data into the
526 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
527 : ///
528 : /// 'L' is the key length in bytes
529 : pub struct DiskBtreeBuilder<W, const L: usize>
530 : where
531 : W: BlockWriter,
532 : {
533 : writer: W,
534 :
535 : ///
536 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
537 : ///
538 : /// We maintain the length of the stack to be always greater than zero.
539 : /// Two exceptions are:
540 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
541 : /// So because other methods cannot see the intermediate state invariant still holds.
542 : /// 2. `Self::finish`. It consumes self and does not return it back,
543 : /// which means that this is where the structure is destroyed.
544 : /// Thus stack of zero length cannot be observed by other methods.
545 : stack: Vec<BuildNode<L>>,
546 :
547 : /// Last key that was appended to the tree. Used to sanity check that append
548 : /// is called in increasing key order.
549 : last_key: Option<[u8; L]>,
550 : }
551 :
552 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
553 : where
554 : W: BlockWriter,
555 : {
556 1562 : pub fn new(writer: W) -> Self {
557 1562 : DiskBtreeBuilder {
558 1562 : writer,
559 1562 : last_key: None,
560 1562 : stack: vec![BuildNode::new(0)],
561 1562 : }
562 1562 : }
563 :
564 7187576 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
565 7187576 : if value > MAX_VALUE {
566 0 : return Err(DiskBtreeError::AppendOverflow(value));
567 7187576 : }
568 7187576 : if let Some(last_key) = &self.last_key {
569 7186020 : if key <= last_key {
570 2 : return Err(DiskBtreeError::UnsortedInput {
571 2 : key: key.as_slice().into(),
572 2 : last_key: last_key.as_slice().into(),
573 2 : });
574 7186018 : }
575 1556 : }
576 7187574 : self.last_key = Some(*key);
577 7187574 :
578 7187574 : self.append_internal(key, Value::from_u64(value))
579 7187576 : }
580 :
581 7200435 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
582 7200435 : // Try to append to the current leaf buffer
583 7200435 : let last = self
584 7200435 : .stack
585 7200435 : .last_mut()
586 7200435 : .expect("should always have at least one item");
587 7200435 : let level = last.level;
588 7200435 : if last.push(key, value) {
589 7175982 : return Ok(());
590 24453 : }
591 24453 :
592 24453 : // It did not fit. Try to compress, and if it succeeds to make
593 24453 : // some room on the node, try appending to it again.
594 24453 : #[allow(clippy::collapsible_if)]
595 24453 : if last.compress() {
596 12389 : if last.push(key, value) {
597 12380 : return Ok(());
598 9 : }
599 12064 : }
600 :
601 : // Could not append to the current leaf. Flush it and create a new one.
602 12073 : self.flush_node()?;
603 :
604 : // Replace the node we flushed with an empty one and append the new
605 : // key to it.
606 12073 : let mut last = BuildNode::new(level);
607 12073 : if !last.push(key, value) {
608 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
609 12073 : }
610 12073 :
611 12073 : self.stack.push(last);
612 12073 :
613 12073 : Ok(())
614 7200435 : }
615 :
616 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
617 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
618 : /// creates a new root page, increasing the height of the tree.
619 12861 : fn flush_node(&mut self) -> Result<()> {
620 12861 : // Get the current bottommost node in the stack and flush it to disk.
621 12861 : let last = self
622 12861 : .stack
623 12861 : .pop()
624 12861 : .expect("should always have at least one item");
625 12861 : let buf = last.pack();
626 12861 : let downlink_key = last.first_key();
627 12861 : let downlink_ptr = self.writer.write_blk(buf)?;
628 :
629 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
630 : // create a new root page, increasing the height of the tree.
631 12861 : if self.stack.is_empty() {
632 788 : self.stack.push(BuildNode::new(last.level + 1));
633 12073 : }
634 12861 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
635 12861 : }
636 :
637 : ///
638 : /// Flushes everything to disk, and returns the block number of the root page.
639 : /// The caller must store the root block number "out-of-band", and pass it
640 : /// to the DiskBtreeReader::new() when you want to read the tree again.
641 : /// (In the image and delta layers, it is stored in the beginning of the file,
642 : /// in the summary header)
643 : ///
644 1554 : pub fn finish(mut self) -> Result<(u32, W)> {
645 : // flush all levels, except the root.
646 2342 : while self.stack.len() > 1 {
647 788 : self.flush_node()?;
648 : }
649 :
650 1554 : let root = self
651 1554 : .stack
652 1554 : .first()
653 1554 : .expect("by the check above we left one item there");
654 1554 : let buf = root.pack();
655 1554 : let root_blknum = self.writer.write_blk(buf)?;
656 :
657 1554 : Ok((root_blknum, self.writer))
658 1554 : }
659 :
660 2023972 : pub fn borrow_writer(&self) -> &W {
661 2023972 : &self.writer
662 2023972 : }
663 : }
664 :
665 : ///
666 : /// BuildNode represesnts an incomplete page that we are appending to.
667 : ///
668 : #[derive(Clone, Debug)]
669 : struct BuildNode<const L: usize> {
670 : num_children: u16,
671 : level: u8,
672 : prefix: Vec<u8>,
673 : suffix_len: usize,
674 :
675 : keys: Vec<u8>,
676 : values: Vec<u8>,
677 :
678 : size: usize, // physical size of this node, if it was written to disk like this
679 : }
680 :
681 : const NODE_SIZE: usize = PAGE_SZ;
682 :
683 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
684 :
685 : impl<const L: usize> BuildNode<L> {
686 14423 : fn new(level: u8) -> Self {
687 14423 : BuildNode {
688 14423 : num_children: 0,
689 14423 : level,
690 14423 : prefix: Vec::new(),
691 14423 : suffix_len: 0,
692 14423 : keys: Vec::new(),
693 14423 : values: Vec::new(),
694 14423 : size: NODE_HDR_SIZE,
695 14423 : }
696 14423 : }
697 :
698 : /// Try to append a key-value pair to this node. Returns 'true' on
699 : /// success, 'false' if the page was full or the key was
700 : /// incompatible with the prefix of the existing keys.
701 7224897 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
702 7224897 : // If we have already performed prefix-compression on the page,
703 7224897 : // check that the incoming key has the same prefix.
704 7224897 : if self.num_children > 0 {
705 : // does the prefix allow it?
706 7210480 : if !key.starts_with(&self.prefix) {
707 219 : return false;
708 7210261 : }
709 14417 : } else {
710 14417 : self.suffix_len = key.len();
711 14417 : }
712 :
713 : // Is the node too full?
714 7224678 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
715 24243 : return false;
716 7200435 : }
717 7200435 :
718 7200435 : // All clear
719 7200435 : self.num_children += 1;
720 7200435 : self.keys.extend(&key[self.prefix.len()..]);
721 7200435 : self.values.extend(value.0);
722 7200435 :
723 7200435 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
724 7200435 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
725 :
726 7200435 : self.size += self.suffix_len + VALUE_SZ;
727 7200435 :
728 7200435 : true
729 7224897 : }
730 :
731 : ///
732 : /// Perform prefix-compression.
733 : ///
734 : /// Returns 'true' on success, 'false' if no compression was possible.
735 : ///
736 24453 : fn compress(&mut self) -> bool {
737 24453 : let first_suffix = self.first_suffix();
738 24453 : let last_suffix = self.last_suffix();
739 24453 :
740 24453 : // Find the common prefix among all keys
741 24453 : let mut prefix_len = 0;
742 221998 : while prefix_len < self.suffix_len {
743 221998 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
744 24453 : break;
745 197545 : }
746 197545 : prefix_len += 1;
747 : }
748 24453 : if prefix_len == 0 {
749 12064 : return false;
750 12389 : }
751 12389 :
752 12389 : // Can compress. Rewrite the keys without the common prefix.
753 12389 : self.prefix.extend(&self.keys[..prefix_len]);
754 12389 :
755 12389 : let mut new_keys = Vec::new();
756 12389 : let mut key_off = 0;
757 3350550 : while key_off < self.keys.len() {
758 3338161 : let next_key_off = key_off + self.suffix_len;
759 3338161 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
760 3338161 : key_off = next_key_off;
761 3338161 : }
762 12389 : self.keys = new_keys;
763 12389 : self.suffix_len -= prefix_len;
764 12389 :
765 12389 : self.size -= prefix_len * self.num_children as usize;
766 12389 : self.size += prefix_len;
767 12389 :
768 12389 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
769 12389 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
770 :
771 12389 : true
772 24453 : }
773 :
774 : ///
775 : /// Serialize the node to on-disk format.
776 : ///
777 14415 : fn pack(&self) -> Bytes {
778 14415 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
779 14415 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
780 14415 : assert!(self.num_children > 0);
781 :
782 14415 : let mut buf = BytesMut::new();
783 14415 :
784 14415 : buf.put_u16(self.num_children);
785 14415 : buf.put_u8(self.level);
786 14415 : buf.put_u8(self.prefix.len() as u8);
787 14415 : buf.put_u8(self.suffix_len as u8);
788 14415 : buf.put(&self.prefix[..]);
789 14415 : buf.put(&self.keys[..]);
790 14415 : buf.put(&self.values[..]);
791 14415 :
792 14415 : assert!(buf.len() == self.size);
793 :
794 14415 : assert!(buf.len() <= PAGE_SZ);
795 14415 : buf.resize(PAGE_SZ, 0);
796 14415 : buf.freeze()
797 14415 : }
798 :
799 37314 : fn first_suffix(&self) -> &[u8] {
800 37314 : &self.keys[..self.suffix_len]
801 37314 : }
802 24453 : fn last_suffix(&self) -> &[u8] {
803 24453 : &self.keys[self.keys.len() - self.suffix_len..]
804 24453 : }
805 :
806 : /// Return the full first key of the page, including the prefix
807 12861 : fn first_key(&self) -> [u8; L] {
808 12861 : let mut key = [0u8; L];
809 12861 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
810 12861 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
811 12861 : key
812 12861 : }
813 : }
814 :
815 : #[cfg(test)]
816 : pub(crate) mod tests {
817 : use super::*;
818 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
819 : use rand::Rng;
820 : use std::collections::BTreeMap;
821 : use std::sync::atomic::{AtomicUsize, Ordering};
822 :
823 : #[derive(Clone, Default)]
824 : pub(crate) struct TestDisk {
825 : blocks: Vec<Bytes>,
826 : }
827 : impl TestDisk {
828 10 : fn new() -> Self {
829 10 : Self::default()
830 10 : }
831 1016850 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
832 1016850 : let mut buf = [0u8; PAGE_SZ];
833 1016850 : buf.copy_from_slice(&self.blocks[blknum as usize]);
834 1016850 : Ok(std::sync::Arc::new(buf).into())
835 1016850 : }
836 : }
837 : impl BlockReader for TestDisk {
838 411262 : fn block_cursor(&self) -> BlockCursor<'_> {
839 411262 : BlockCursor::new(BlockReaderRef::TestDisk(self))
840 411262 : }
841 : }
842 : impl BlockWriter for &mut TestDisk {
843 217 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
844 217 : let blknum = self.blocks.len();
845 217 : self.blocks.push(buf);
846 217 : Ok(blknum as u32)
847 217 : }
848 : }
849 :
850 : #[tokio::test]
851 2 : async fn basic() -> Result<()> {
852 2 : let mut disk = TestDisk::new();
853 2 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
854 2 :
855 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
856 2 :
857 2 : let all_keys: Vec<&[u8; 6]> = vec![
858 2 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
859 2 : ];
860 2 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
861 2 : .iter()
862 2 : .enumerate()
863 16 : .map(|(idx, key)| (*key, idx as u64))
864 2 : .collect();
865 16 : for (key, val) in all_data.iter() {
866 16 : writer.append(key, *val)?;
867 2 : }
868 2 :
869 2 : let (root_offset, _writer) = writer.finish()?;
870 2 :
871 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
872 2 :
873 2 : reader.dump().await?;
874 2 :
875 2 : // Test the `get` function on all the keys.
876 16 : for (key, val) in all_data.iter() {
877 16 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
878 2 : }
879 2 : // And on some keys that don't exist
880 2 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
881 2 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
882 2 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
883 2 :
884 2 : // Test search with `visit` function
885 2 : let search_key = b"xabaaa";
886 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
887 2 : .iter()
888 16 : .filter(|(key, _value)| key[..] >= search_key[..])
889 10 : .map(|(key, value)| (key.to_vec(), *value))
890 2 : .collect();
891 2 :
892 2 : let mut data = Vec::new();
893 2 : reader
894 2 : .visit(
895 2 : search_key,
896 2 : VisitDirection::Forwards,
897 10 : |key, value| {
898 10 : data.push((key.to_vec(), value));
899 10 : true
900 10 : },
901 2 : &ctx,
902 2 : )
903 2 : .await?;
904 2 : assert_eq!(data, expected);
905 2 :
906 2 : // Test a backwards scan
907 2 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
908 2 : .iter()
909 16 : .filter(|(key, _value)| key[..] <= search_key[..])
910 8 : .map(|(key, value)| (key.to_vec(), *value))
911 2 : .collect();
912 2 : expected.reverse();
913 2 : let mut data = Vec::new();
914 2 : reader
915 2 : .visit(
916 2 : search_key,
917 2 : VisitDirection::Backwards,
918 8 : |key, value| {
919 8 : data.push((key.to_vec(), value));
920 8 : true
921 8 : },
922 2 : &ctx,
923 2 : )
924 2 : .await?;
925 2 : assert_eq!(data, expected);
926 2 :
927 2 : // Backward scan where nothing matches
928 2 : reader
929 2 : .visit(
930 2 : b"aaaaaa",
931 2 : VisitDirection::Backwards,
932 2 : |key, value| {
933 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
934 2 : },
935 2 : &ctx,
936 2 : )
937 2 : .await?;
938 2 :
939 2 : // Full scan
940 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
941 2 : .iter()
942 16 : .map(|(key, value)| (key.to_vec(), *value))
943 2 : .collect();
944 2 : let mut data = Vec::new();
945 2 : reader
946 2 : .visit(
947 2 : &[0u8; 6],
948 2 : VisitDirection::Forwards,
949 16 : |key, value| {
950 16 : data.push((key.to_vec(), value));
951 16 : true
952 16 : },
953 2 : &ctx,
954 2 : )
955 2 : .await?;
956 2 : assert_eq!(data, expected);
957 2 :
958 2 : Ok(())
959 2 : }
960 :
961 : #[tokio::test]
962 2 : async fn lots_of_keys() -> Result<()> {
963 2 : let mut disk = TestDisk::new();
964 2 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
965 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
966 2 :
967 2 : const NUM_KEYS: u64 = 1000;
968 2 :
969 2 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
970 2 :
971 2002 : for idx in 0..NUM_KEYS {
972 2000 : let key_int: u64 = 1 + idx * 2;
973 2000 : let key = u64::to_be_bytes(key_int);
974 2000 : writer.append(&key, idx)?;
975 2 :
976 2000 : all_data.insert(key_int, idx);
977 2 : }
978 2 :
979 2 : let (root_offset, _writer) = writer.finish()?;
980 2 :
981 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
982 2 :
983 2 : reader.dump().await?;
984 2 :
985 2 : use std::sync::Mutex;
986 2 :
987 2 : let result = Mutex::new(Vec::new());
988 2 : let limit: AtomicUsize = AtomicUsize::new(10);
989 83820 : let take_ten = |key: &[u8], value: u64| {
990 83820 : let mut keybuf = [0u8; 8];
991 83820 : keybuf.copy_from_slice(key);
992 83820 : let key_int = u64::from_be_bytes(keybuf);
993 83820 :
994 83820 : let mut result = result.lock().unwrap();
995 83820 : result.push((key_int, value));
996 83820 :
997 83820 : // keep going until we have 10 matches
998 83820 : result.len() < limit.load(Ordering::Relaxed)
999 83820 : };
1000 2 :
1001 4020 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
1002 4020 : let search_key = u64::to_be_bytes(search_key_int);
1003 4020 : assert_eq!(
1004 4020 : reader.get(&search_key, &ctx).await?,
1005 4020 : all_data.get(&search_key_int).cloned()
1006 2 : );
1007 2 :
1008 2 : // Test a forward scan starting with this key
1009 4020 : result.lock().unwrap().clear();
1010 4020 : reader
1011 4020 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1012 2 : .await?;
1013 4020 : let expected = all_data
1014 4020 : .range(search_key_int..)
1015 4020 : .take(10)
1016 39820 : .map(|(&key, &val)| (key, val))
1017 4020 : .collect::<Vec<(u64, u64)>>();
1018 4020 : assert_eq!(*result.lock().unwrap(), expected);
1019 2 :
1020 2 : // And a backwards scan
1021 4020 : result.lock().unwrap().clear();
1022 4020 : reader
1023 4020 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1024 2 : .await?;
1025 4020 : let expected = all_data
1026 4020 : .range(..=search_key_int)
1027 4020 : .rev()
1028 4020 : .take(10)
1029 40000 : .map(|(&key, &val)| (key, val))
1030 4020 : .collect::<Vec<(u64, u64)>>();
1031 4020 : assert_eq!(*result.lock().unwrap(), expected);
1032 2 : }
1033 2 :
1034 2 : // full scan
1035 2 : let search_key = u64::to_be_bytes(0);
1036 2 : limit.store(usize::MAX, Ordering::Relaxed);
1037 2 : result.lock().unwrap().clear();
1038 2 : reader
1039 2 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1040 2 : .await?;
1041 2 : let expected = all_data
1042 2 : .iter()
1043 2000 : .map(|(&key, &val)| (key, val))
1044 2 : .collect::<Vec<(u64, u64)>>();
1045 2 : assert_eq!(*result.lock().unwrap(), expected);
1046 2 :
1047 2 : // full scan
1048 2 : let search_key = u64::to_be_bytes(u64::MAX);
1049 2 : limit.store(usize::MAX, Ordering::Relaxed);
1050 2 : result.lock().unwrap().clear();
1051 2 : reader
1052 2 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1053 2 : .await?;
1054 2 : let expected = all_data
1055 2 : .iter()
1056 2 : .rev()
1057 2000 : .map(|(&key, &val)| (key, val))
1058 2 : .collect::<Vec<(u64, u64)>>();
1059 2 : assert_eq!(*result.lock().unwrap(), expected);
1060 2 :
1061 2 : Ok(())
1062 2 : }
1063 :
1064 : #[tokio::test]
1065 2 : async fn random_data() -> Result<()> {
1066 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1067 2 :
1068 2 : // Generate random keys with exponential distribution, to
1069 2 : // exercise the prefix compression
1070 2 : const NUM_KEYS: usize = 100000;
1071 2 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
1072 200002 : for idx in 0..NUM_KEYS {
1073 200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
1074 200000 : let t = -(f64::ln(u));
1075 200000 : let key_int = (t * 1000000.0) as u128;
1076 200000 :
1077 200000 : all_data.insert(key_int, idx as u64);
1078 200000 : }
1079 2 :
1080 2 : // Build a tree from it
1081 2 : let mut disk = TestDisk::new();
1082 2 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
1083 2 :
1084 195150 : for (&key, &val) in all_data.iter() {
1085 195150 : writer.append(&u128::to_be_bytes(key), val)?;
1086 2 : }
1087 2 : let (root_offset, _writer) = writer.finish()?;
1088 2 :
1089 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1090 2 :
1091 2 : // Test get() operation on all the keys
1092 195150 : for (&key, &val) in all_data.iter() {
1093 195150 : let search_key = u128::to_be_bytes(key);
1094 195150 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
1095 2 : }
1096 2 :
1097 2 : // Test get() operations on random keys, most of which will not exist
1098 200002 : for _ in 0..100000 {
1099 200000 : let key_int = rand::thread_rng().gen::<u128>();
1100 200000 : let search_key = u128::to_be_bytes(key_int);
1101 200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
1102 2 : }
1103 2 :
1104 2 : // Test boundary cases
1105 2 : assert!(
1106 2 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
1107 2 : == all_data.get(&u128::MIN).cloned()
1108 2 : );
1109 2 : assert!(
1110 2 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
1111 2 : == all_data.get(&u128::MAX).cloned()
1112 2 : );
1113 2 :
1114 2 : // Test iterator and get_stream API
1115 2 : let mut iter = reader.iter(&[0; 16], &ctx);
1116 2 : let mut cnt = 0;
1117 195152 : while let Some(res) = iter.next().await {
1118 195150 : let (key, val) = res?;
1119 195150 : let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
1120 195150 : assert_eq!(val, *all_data.get(&key).unwrap());
1121 195150 : cnt += 1;
1122 2 : }
1123 2 : assert_eq!(cnt, all_data.len());
1124 2 :
1125 2 : Ok(())
1126 2 : }
1127 :
1128 : #[test]
1129 2 : fn unsorted_input() {
1130 2 : let mut disk = TestDisk::new();
1131 2 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1132 2 :
1133 2 : let _ = writer.append(b"ba", 1);
1134 2 : let _ = writer.append(b"bb", 2);
1135 2 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1136 2 : match err {
1137 2 : DiskBtreeError::UnsortedInput { key, last_key } => {
1138 2 : assert_eq!(key.as_ref(), b"aa".as_slice());
1139 2 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1140 : }
1141 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1142 : }
1143 2 : }
1144 :
1145 : ///
1146 : /// This test contains a particular data set, see disk_btree_test_data.rs
1147 : ///
1148 : #[tokio::test]
1149 2 : async fn particular_data() -> Result<()> {
1150 2 : // Build a tree from it
1151 2 : let mut disk = TestDisk::new();
1152 2 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1153 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1154 2 :
1155 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1156 4000 : writer.append(&key, val)?;
1157 2 : }
1158 2 : let (root_offset, writer) = writer.finish()?;
1159 2 :
1160 2 : println!("SIZE: {} blocks", writer.blocks.len());
1161 2 :
1162 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1163 2 :
1164 2 : // Test get() operation on all the keys
1165 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1166 4000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1167 2 : }
1168 2 :
1169 2 : // Test full scan
1170 2 : let mut count = 0;
1171 2 : reader
1172 2 : .visit(
1173 2 : &[0u8; 26],
1174 2 : VisitDirection::Forwards,
1175 4000 : |_key, _value| {
1176 4000 : count += 1;
1177 4000 : true
1178 4000 : },
1179 2 : &ctx,
1180 2 : )
1181 2 : .await?;
1182 2 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1183 2 :
1184 2 : reader.dump().await?;
1185 2 :
1186 2 : Ok(())
1187 2 : }
1188 : }
1189 :
1190 : #[cfg(test)]
1191 : #[path = "disk_btree_test_data.rs"]
1192 : mod disk_btree_test_data;
|