Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use async_stream::try_stream;
22 : use byteorder::{ReadBytesExt, BE};
23 : use bytes::{BufMut, Bytes, BytesMut};
24 : use either::Either;
25 : use futures::Stream;
26 : use hex;
27 : use std::{
28 : cmp::Ordering,
29 : io,
30 : iter::Rev,
31 : ops::{Range, RangeInclusive},
32 : result,
33 : };
34 : use thiserror::Error;
35 : use tracing::error;
36 :
37 : use crate::{
38 : context::{DownloadBehavior, RequestContext},
39 : task_mgr::TaskKind,
40 : tenant::block_io::{BlockReader, BlockWriter},
41 : };
42 :
43 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
44 : pub const VALUE_SZ: usize = 5;
45 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
46 :
47 : pub const PAGE_SZ: usize = 8192;
48 :
49 : #[derive(Clone, Copy, Debug)]
50 : struct Value([u8; VALUE_SZ]);
51 :
52 : impl Value {
53 3363783 : fn from_slice(slice: &[u8]) -> Value {
54 3363783 : let mut b = [0u8; VALUE_SZ];
55 3363783 : b.copy_from_slice(slice);
56 3363783 : Value(b)
57 3363783 : }
58 :
59 6606533 : fn from_u64(x: u64) -> Value {
60 6606533 : assert!(x <= 0x007f_ffff_ffff);
61 6606533 : Value([
62 6606533 : (x >> 32) as u8,
63 6606533 : (x >> 24) as u8,
64 6606533 : (x >> 16) as u8,
65 6606533 : (x >> 8) as u8,
66 6606533 : x as u8,
67 6606533 : ])
68 6606533 : }
69 :
70 12308 : fn from_blknum(x: u32) -> Value {
71 12308 : Value([
72 12308 : 0x80,
73 12308 : (x >> 24) as u8,
74 12308 : (x >> 16) as u8,
75 12308 : (x >> 8) as u8,
76 12308 : x as u8,
77 12308 : ])
78 12308 : }
79 :
80 : #[allow(dead_code)]
81 0 : fn is_offset(self) -> bool {
82 0 : self.0[0] & 0x80 != 0
83 0 : }
84 :
85 2688160 : fn to_u64(self) -> u64 {
86 2688160 : let b = &self.0;
87 2688160 : (b[0] as u64) << 32
88 2688160 : | (b[1] as u64) << 24
89 2688160 : | (b[2] as u64) << 16
90 2688160 : | (b[3] as u64) << 8
91 2688160 : | b[4] as u64
92 2688160 : }
93 :
94 669599 : fn to_blknum(self) -> u32 {
95 669599 : let b = &self.0;
96 669599 : assert!(b[0] == 0x80);
97 669599 : (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
98 669599 : }
99 : }
100 :
101 0 : #[derive(Error, Debug)]
102 : pub enum DiskBtreeError {
103 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
104 : AppendOverflow(u64),
105 :
106 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
107 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
108 :
109 : #[error("Could not push to new leaf node")]
110 : FailedToPushToNewLeafNode,
111 :
112 : #[error("IoError: {0}")]
113 : Io(#[from] io::Error),
114 : }
115 :
116 : pub type Result<T> = result::Result<T, DiskBtreeError>;
117 :
118 : /// This is the on-disk representation.
119 : struct OnDiskNode<'a, const L: usize> {
120 : // Fixed-width fields
121 : num_children: u16,
122 : level: u8,
123 : prefix_len: u8,
124 : suffix_len: u8,
125 :
126 : // Variable-length fields. These are stored on-disk after the fixed-width
127 : // fields, in this order. In the in-memory representation, these point to
128 : // the right parts in the page buffer.
129 : prefix: &'a [u8],
130 : keys: &'a [u8],
131 : values: &'a [u8],
132 : }
133 :
134 : impl<'a, const L: usize> OnDiskNode<'a, L> {
135 : ///
136 : /// Interpret a PAGE_SZ page as a node.
137 : ///
138 1564359 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
139 1564359 : let mut cursor = std::io::Cursor::new(buf);
140 1564359 : let num_children = cursor.read_u16::<BE>()?;
141 1564359 : let level = cursor.read_u8()?;
142 1564359 : let prefix_len = cursor.read_u8()?;
143 1564359 : let suffix_len = cursor.read_u8()?;
144 :
145 1564359 : let mut off = cursor.position();
146 1564359 : let prefix_off = off as usize;
147 1564359 : off += prefix_len as u64;
148 1564359 :
149 1564359 : let keys_off = off as usize;
150 1564359 : let keys_len = num_children as usize * suffix_len as usize;
151 1564359 : off += keys_len as u64;
152 1564359 :
153 1564359 : let values_off = off as usize;
154 1564359 : let values_len = num_children as usize * VALUE_SZ;
155 1564359 : //off += values_len as u64;
156 1564359 :
157 1564359 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
158 1564359 : let keys = &buf[keys_off..keys_off + keys_len];
159 1564359 : let values = &buf[values_off..values_off + values_len];
160 1564359 :
161 1564359 : Ok(OnDiskNode {
162 1564359 : num_children,
163 1564359 : level,
164 1564359 : prefix_len,
165 1564359 : suffix_len,
166 1564359 : prefix,
167 1564359 : keys,
168 1564359 : values,
169 1564359 : })
170 1564359 : }
171 :
172 : ///
173 : /// Read a value at 'idx'
174 : ///
175 3363783 : fn value(&self, idx: usize) -> Value {
176 3363783 : let value_off = idx * VALUE_SZ;
177 3363783 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
178 3363783 : Value::from_slice(value_slice)
179 3363783 : }
180 :
181 1354266 : fn binary_search(
182 1354266 : &self,
183 1354266 : search_key: &[u8; L],
184 1354266 : keybuf: &mut [u8],
185 1354266 : ) -> result::Result<usize, usize> {
186 1354266 : let mut size = self.num_children as usize;
187 1354266 : let mut low = 0;
188 1354266 : let mut high = size;
189 9957815 : while low < high {
190 8809752 : let mid = low + size / 2;
191 8809752 :
192 8809752 : let key_off = mid * self.suffix_len as usize;
193 8809752 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
194 8809752 : // Does this match?
195 8809752 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
196 8809752 :
197 8809752 : let cmp = keybuf[..].cmp(search_key);
198 8809752 :
199 8809752 : if cmp == Ordering::Less {
200 5597070 : low = mid + 1;
201 5597070 : } else if cmp == Ordering::Greater {
202 3006479 : high = mid;
203 3006479 : } else {
204 206203 : return Ok(mid);
205 : }
206 8603549 : size = high - low;
207 : }
208 1148063 : Err(low)
209 1354266 : }
210 : }
211 :
212 : ///
213 : /// Public reader object, to search the tree.
214 : ///
215 : pub struct DiskBtreeReader<R, const L: usize>
216 : where
217 : R: BlockReader,
218 : {
219 : start_blk: u32,
220 : root_blk: u32,
221 : reader: R,
222 : }
223 :
224 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
225 : pub enum VisitDirection {
226 : Forwards,
227 : Backwards,
228 : }
229 :
230 : impl<R, const L: usize> DiskBtreeReader<R, L>
231 : where
232 : R: BlockReader,
233 : {
234 209968 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
235 209968 : DiskBtreeReader {
236 209968 : start_blk,
237 209968 : root_blk,
238 209968 : reader,
239 209968 : }
240 209968 : }
241 :
242 : ///
243 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
244 : ///
245 403615 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
246 403615 : let mut result: Option<u64> = None;
247 403615 : self.visit(
248 403615 : search_key,
249 403615 : VisitDirection::Forwards,
250 403615 : |key, value| {
251 203591 : if key == search_key {
252 201581 : result = Some(value);
253 201581 : }
254 203591 : false
255 403615 : },
256 403615 : ctx,
257 403615 : )
258 299 : .await?;
259 403615 : Ok(result)
260 403615 : }
261 :
262 : /// Return a stream which yields all key, value pairs from the index
263 : /// starting from the first key greater or equal to `start_key`.
264 : ///
265 : /// Note that this is a copy of [`Self::visit`].
266 : /// TODO: Once the sequential read path is removed this will become
267 : /// the only index traversal method.
268 63958 : pub fn get_stream_from<'a>(
269 63958 : &'a self,
270 63958 : start_key: &'a [u8; L],
271 63958 : ctx: &'a RequestContext,
272 63958 : ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
273 : try_stream! {
274 : let mut stack = Vec::new();
275 : stack.push((self.root_blk, None));
276 : let block_cursor = self.reader.block_cursor();
277 : while let Some((node_blknum, opt_iter)) = stack.pop() {
278 : // Locate the node.
279 : let node_buf = block_cursor
280 : .read_blk(self.start_blk + node_blknum, ctx)
281 : .await?;
282 :
283 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
284 : let prefix_len = node.prefix_len as usize;
285 : let suffix_len = node.suffix_len as usize;
286 :
287 : assert!(node.num_children > 0);
288 :
289 : let mut keybuf = Vec::new();
290 : keybuf.extend(node.prefix);
291 : keybuf.resize(prefix_len + suffix_len, 0);
292 :
293 : let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
294 : iter
295 : } else {
296 : // Locate the first match
297 : let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
298 : Ok(idx) => idx,
299 : Err(idx) => {
300 : if node.level == 0 {
301 : // Imagine that the node contains the following keys:
302 : //
303 : // 1
304 : // 3 <-- idx
305 : // 5
306 : //
307 : // If the search key is '2' and there is exact match,
308 : // the binary search would return the index of key
309 : // '3'. That's cool, '3' is the first key to return.
310 : idx
311 : } else {
312 : // This is an internal page, so each key represents a lower
313 : // bound for what's in the child page. If there is no exact
314 : // match, we have to return the *previous* entry.
315 : //
316 : // 1 <-- return this
317 : // 3 <-- idx
318 : // 5
319 : idx.saturating_sub(1)
320 : }
321 : }
322 : };
323 : Either::Left(idx..node.num_children.into())
324 : };
325 :
326 : // idx points to the first match now. Keep going from there
327 : while let Some(idx) = iter.next() {
328 : let key_off = idx * suffix_len;
329 : let suffix = &node.keys[key_off..key_off + suffix_len];
330 : keybuf[prefix_len..].copy_from_slice(suffix);
331 : let value = node.value(idx);
332 : #[allow(clippy::collapsible_if)]
333 : if node.level == 0 {
334 : // leaf
335 : yield (keybuf.clone(), value.to_u64());
336 : } else {
337 : stack.push((node_blknum, Some(iter)));
338 : stack.push((value.to_blknum(), None));
339 : break;
340 : }
341 : }
342 : }
343 : }
344 63958 : }
345 :
346 : ///
347 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
348 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
349 : /// backwards)
350 : ///
351 620717 : pub async fn visit<V>(
352 620717 : &self,
353 620717 : search_key: &[u8; L],
354 620717 : dir: VisitDirection,
355 620717 : mut visitor: V,
356 620717 : ctx: &RequestContext,
357 620717 : ) -> Result<bool>
358 620717 : where
359 620717 : V: FnMut(&[u8], u64) -> bool,
360 620717 : {
361 620717 : let mut stack = Vec::new();
362 620717 : stack.push((self.root_blk, None));
363 620717 : let block_cursor = self.reader.block_cursor();
364 1630952 : while let Some((node_blknum, opt_iter)) = stack.pop() {
365 : // Locate the node.
366 1430558 : let node_buf = block_cursor
367 1430558 : .read_blk(self.start_blk + node_blknum, ctx)
368 22713 : .await?;
369 :
370 1430558 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
371 1430558 : let prefix_len = node.prefix_len as usize;
372 1430558 : let suffix_len = node.suffix_len as usize;
373 1430558 :
374 1430558 : assert!(node.num_children > 0);
375 :
376 1430558 : let mut keybuf = Vec::new();
377 1430558 : keybuf.extend(node.prefix);
378 1430558 : keybuf.resize(prefix_len + suffix_len, 0);
379 :
380 1430558 : let mut iter = if let Some(iter) = opt_iter {
381 203854 : iter
382 1226704 : } else if dir == VisitDirection::Forwards {
383 : // Locate the first match
384 810808 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
385 203789 : Ok(idx) => idx,
386 607019 : Err(idx) => {
387 607019 : if node.level == 0 {
388 : // Imagine that the node contains the following keys:
389 : //
390 : // 1
391 : // 3 <-- idx
392 : // 5
393 : //
394 : // If the search key is '2' and there is exact match,
395 : // the binary search would return the index of key
396 : // '3'. That's cool, '3' is the first key to return.
397 207956 : idx
398 : } else {
399 : // This is an internal page, so each key represents a lower
400 : // bound for what's in the child page. If there is no exact
401 : // match, we have to return the *previous* entry.
402 : //
403 : // 1 <-- return this
404 : // 3 <-- idx
405 : // 5
406 399063 : idx.saturating_sub(1)
407 : }
408 : }
409 : };
410 810808 : Either::Left(idx..node.num_children.into())
411 : } else {
412 415896 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
413 2228 : Ok(idx) => {
414 2228 : // Exact match. That's the first entry to return, and walk
415 2228 : // backwards from there.
416 2228 : idx
417 : }
418 413668 : Err(idx) => {
419 : // No exact match. The binary search returned the index of the
420 : // first key that's > search_key. Back off by one, and walk
421 : // backwards from there.
422 413668 : if let Some(idx) = idx.checked_sub(1) {
423 408160 : idx
424 : } else {
425 5508 : return Ok(false);
426 : }
427 : }
428 : };
429 410388 : Either::Right((0..=idx).rev())
430 : };
431 :
432 : // idx points to the first match now. Keep going from there
433 3546964 : while let Some(idx) = iter.next() {
434 3142716 : let key_off = idx * suffix_len;
435 3142716 : let suffix = &node.keys[key_off..key_off + suffix_len];
436 3142716 : keybuf[prefix_len..].copy_from_slice(suffix);
437 3142716 : let value = node.value(idx);
438 3142716 : #[allow(clippy::collapsible_if)]
439 3142716 : if node.level == 0 {
440 : // leaf
441 2536729 : if !visitor(&keybuf, value.to_u64()) {
442 414815 : return Ok(false);
443 2121914 : }
444 : } else {
445 605987 : stack.push((node_blknum, Some(iter)));
446 605987 : stack.push((value.to_blknum(), None));
447 605987 : break;
448 : }
449 : }
450 : }
451 200394 : Ok(true)
452 620717 : }
453 :
454 : #[allow(dead_code)]
455 10 : pub async fn dump(&self) -> Result<()> {
456 10 : let mut stack = Vec::new();
457 10 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
458 10 :
459 10 : stack.push((self.root_blk, String::new(), 0, 0, 0));
460 10 :
461 10 : let block_cursor = self.reader.block_cursor();
462 :
463 6042 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
464 6032 : let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
465 6032 : let buf: &[u8] = blk.as_ref();
466 6032 : let node = OnDiskNode::<L>::deparse(buf)?;
467 :
468 6032 : if child_idx == 0 {
469 18 : print!("{:indent$}", "", indent = depth * 2);
470 18 : let path_prefix = stack
471 18 : .iter()
472 18 : .map(|(_blknum, path, ..)| path.as_str())
473 18 : .collect::<String>();
474 18 : println!(
475 18 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
476 18 : hex::encode(node.prefix),
477 18 : node.suffix_len
478 18 : );
479 6014 : }
480 :
481 6032 : if child_idx + 1 < node.num_children {
482 6014 : let key_off = key_off + node.suffix_len as usize;
483 6014 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
484 6014 : }
485 6032 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
486 6032 : let val = node.value(child_idx as usize);
487 6032 :
488 6032 : print!("{:indent$}", "", indent = depth * 2 + 2);
489 6032 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
490 6032 :
491 6032 : if node.level > 0 {
492 8 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
493 6024 : }
494 : }
495 10 : Ok(())
496 10 : }
497 : }
498 :
499 : ///
500 : /// Public builder object, for creating a new tree.
501 : ///
502 : /// Usage: Create a builder object by calling 'new', load all the data into the
503 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
504 : ///
505 : /// 'L' is the key length in bytes
506 : pub struct DiskBtreeBuilder<W, const L: usize>
507 : where
508 : W: BlockWriter,
509 : {
510 : writer: W,
511 :
512 : ///
513 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
514 : ///
515 : /// We maintain the length of the stack to be always greater than zero.
516 : /// Two exceptions are:
517 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
518 : /// So because other methods cannot see the intermediate state invariant still holds.
519 : /// 2. `Self::finish`. It consumes self and does not return it back,
520 : /// which means that this is where the structure is destroyed.
521 : /// Thus stack of zero length cannot be observed by other methods.
522 : stack: Vec<BuildNode<L>>,
523 :
524 : /// Last key that was appended to the tree. Used to sanity check that append
525 : /// is called in increasing key order.
526 : last_key: Option<[u8; L]>,
527 : }
528 :
529 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
530 : where
531 : W: BlockWriter,
532 : {
533 1190 : pub fn new(writer: W) -> Self {
534 1190 : DiskBtreeBuilder {
535 1190 : writer,
536 1190 : last_key: None,
537 1190 : stack: vec![BuildNode::new(0)],
538 1190 : }
539 1190 : }
540 :
541 6606535 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
542 6606535 : if value > MAX_VALUE {
543 0 : return Err(DiskBtreeError::AppendOverflow(value));
544 6606535 : }
545 6606535 : if let Some(last_key) = &self.last_key {
546 6605345 : if key <= last_key {
547 2 : return Err(DiskBtreeError::UnsortedInput {
548 2 : key: key.as_slice().into(),
549 2 : last_key: last_key.as_slice().into(),
550 2 : });
551 6605343 : }
552 1190 : }
553 6606533 : self.last_key = Some(*key);
554 6606533 :
555 6606533 : self.append_internal(key, Value::from_u64(value))
556 6606535 : }
557 :
558 6618841 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
559 6618841 : // Try to append to the current leaf buffer
560 6618841 : let last = self
561 6618841 : .stack
562 6618841 : .last_mut()
563 6618841 : .expect("should always have at least one item");
564 6618841 : let level = last.level;
565 6618841 : if last.push(key, value) {
566 6595429 : return Ok(());
567 23412 : }
568 23412 :
569 23412 : // It did not fit. Try to compress, and if it succeeds to make
570 23412 : // some room on the node, try appending to it again.
571 23412 : #[allow(clippy::collapsible_if)]
572 23412 : if last.compress() {
573 11873 : if last.push(key, value) {
574 11860 : return Ok(());
575 13 : }
576 11539 : }
577 :
578 : // Could not append to the current leaf. Flush it and create a new one.
579 11552 : self.flush_node()?;
580 :
581 : // Replace the node we flushed with an empty one and append the new
582 : // key to it.
583 11552 : let mut last = BuildNode::new(level);
584 11552 : if !last.push(key, value) {
585 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
586 11552 : }
587 11552 :
588 11552 : self.stack.push(last);
589 11552 :
590 11552 : Ok(())
591 6618841 : }
592 :
593 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
594 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
595 : /// creates a new root page, increasing the height of the tree.
596 12308 : fn flush_node(&mut self) -> Result<()> {
597 12308 : // Get the current bottommost node in the stack and flush it to disk.
598 12308 : let last = self
599 12308 : .stack
600 12308 : .pop()
601 12308 : .expect("should always have at least one item");
602 12308 : let buf = last.pack();
603 12308 : let downlink_key = last.first_key();
604 12308 : let downlink_ptr = self.writer.write_blk(buf)?;
605 :
606 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
607 : // create a new root page, increasing the height of the tree.
608 12308 : if self.stack.is_empty() {
609 756 : self.stack.push(BuildNode::new(last.level + 1));
610 11552 : }
611 12308 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
612 12308 : }
613 :
614 : ///
615 : /// Flushes everything to disk, and returns the block number of the root page.
616 : /// The caller must store the root block number "out-of-band", and pass it
617 : /// to the DiskBtreeReader::new() when you want to read the tree again.
618 : /// (In the image and delta layers, it is stored in the beginning of the file,
619 : /// in the summary header)
620 : ///
621 1188 : pub fn finish(mut self) -> Result<(u32, W)> {
622 : // flush all levels, except the root.
623 1944 : while self.stack.len() > 1 {
624 756 : self.flush_node()?;
625 : }
626 :
627 1188 : let root = self
628 1188 : .stack
629 1188 : .first()
630 1188 : .expect("by the check above we left one item there");
631 1188 : let buf = root.pack();
632 1188 : let root_blknum = self.writer.write_blk(buf)?;
633 :
634 1188 : Ok((root_blknum, self.writer))
635 1188 : }
636 :
637 2021974 : pub fn borrow_writer(&self) -> &W {
638 2021974 : &self.writer
639 2021974 : }
640 : }
641 :
642 : ///
643 : /// BuildNode represesnts an incomplete page that we are appending to.
644 : ///
645 : #[derive(Clone, Debug)]
646 : struct BuildNode<const L: usize> {
647 : num_children: u16,
648 : level: u8,
649 : prefix: Vec<u8>,
650 : suffix_len: usize,
651 :
652 : keys: Vec<u8>,
653 : values: Vec<u8>,
654 :
655 : size: usize, // physical size of this node, if it was written to disk like this
656 : }
657 :
658 : const NODE_SIZE: usize = PAGE_SZ;
659 :
660 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
661 :
662 : impl<const L: usize> BuildNode<L> {
663 13498 : fn new(level: u8) -> Self {
664 13498 : BuildNode {
665 13498 : num_children: 0,
666 13498 : level,
667 13498 : prefix: Vec::new(),
668 13498 : suffix_len: 0,
669 13498 : keys: Vec::new(),
670 13498 : values: Vec::new(),
671 13498 : size: NODE_HDR_SIZE,
672 13498 : }
673 13498 : }
674 :
675 : /// Try to append a key-value pair to this node. Returns 'true' on
676 : /// success, 'false' if the page was full or the key was
677 : /// incompatible with the prefix of the existing keys.
678 6642266 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
679 6642266 : // If we have already performed prefix-compression on the page,
680 6642266 : // check that the incoming key has the same prefix.
681 6642266 : if self.num_children > 0 {
682 : // does the prefix allow it?
683 6628768 : if !key.starts_with(&self.prefix) {
684 223 : return false;
685 6628545 : }
686 13498 : } else {
687 13498 : self.suffix_len = key.len();
688 13498 : }
689 :
690 : // Is the node too full?
691 6642043 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
692 23202 : return false;
693 6618841 : }
694 6618841 :
695 6618841 : // All clear
696 6618841 : self.num_children += 1;
697 6618841 : self.keys.extend(&key[self.prefix.len()..]);
698 6618841 : self.values.extend(value.0);
699 6618841 :
700 6618841 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
701 6618841 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
702 :
703 6618841 : self.size += self.suffix_len + VALUE_SZ;
704 6618841 :
705 6618841 : true
706 6642266 : }
707 :
708 : ///
709 : /// Perform prefix-compression.
710 : ///
711 : /// Returns 'true' on success, 'false' if no compression was possible.
712 : ///
713 23412 : fn compress(&mut self) -> bool {
714 23412 : let first_suffix = self.first_suffix();
715 23412 : let last_suffix = self.last_suffix();
716 23412 :
717 23412 : // Find the common prefix among all keys
718 23412 : let mut prefix_len = 0;
719 212885 : while prefix_len < self.suffix_len {
720 212885 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
721 23412 : break;
722 189473 : }
723 189473 : prefix_len += 1;
724 : }
725 23412 : if prefix_len == 0 {
726 11539 : return false;
727 11873 : }
728 11873 :
729 11873 : // Can compress. Rewrite the keys without the common prefix.
730 11873 : self.prefix.extend(&self.keys[..prefix_len]);
731 11873 :
732 11873 : let mut new_keys = Vec::new();
733 11873 : let mut key_off = 0;
734 3171575 : while key_off < self.keys.len() {
735 3159702 : let next_key_off = key_off + self.suffix_len;
736 3159702 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
737 3159702 : key_off = next_key_off;
738 3159702 : }
739 11873 : self.keys = new_keys;
740 11873 : self.suffix_len -= prefix_len;
741 11873 :
742 11873 : self.size -= prefix_len * self.num_children as usize;
743 11873 : self.size += prefix_len;
744 11873 :
745 11873 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
746 11873 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
747 :
748 11873 : true
749 23412 : }
750 :
751 : ///
752 : /// Serialize the node to on-disk format.
753 : ///
754 13496 : fn pack(&self) -> Bytes {
755 13496 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
756 13496 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
757 13496 : assert!(self.num_children > 0);
758 :
759 13496 : let mut buf = BytesMut::new();
760 13496 :
761 13496 : buf.put_u16(self.num_children);
762 13496 : buf.put_u8(self.level);
763 13496 : buf.put_u8(self.prefix.len() as u8);
764 13496 : buf.put_u8(self.suffix_len as u8);
765 13496 : buf.put(&self.prefix[..]);
766 13496 : buf.put(&self.keys[..]);
767 13496 : buf.put(&self.values[..]);
768 13496 :
769 13496 : assert!(buf.len() == self.size);
770 :
771 13496 : assert!(buf.len() <= PAGE_SZ);
772 13496 : buf.resize(PAGE_SZ, 0);
773 13496 : buf.freeze()
774 13496 : }
775 :
776 35720 : fn first_suffix(&self) -> &[u8] {
777 35720 : &self.keys[..self.suffix_len]
778 35720 : }
779 23412 : fn last_suffix(&self) -> &[u8] {
780 23412 : &self.keys[self.keys.len() - self.suffix_len..]
781 23412 : }
782 :
783 : /// Return the full first key of the page, including the prefix
784 12308 : fn first_key(&self) -> [u8; L] {
785 12308 : let mut key = [0u8; L];
786 12308 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
787 12308 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
788 12308 : key
789 12308 : }
790 : }
791 :
792 : #[cfg(test)]
793 : pub(crate) mod tests {
794 : use super::*;
795 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
796 : use rand::Rng;
797 : use std::collections::BTreeMap;
798 : use std::sync::atomic::{AtomicUsize, Ordering};
799 :
800 : #[derive(Clone, Default)]
801 : pub(crate) struct TestDisk {
802 : blocks: Vec<Bytes>,
803 : }
804 : impl TestDisk {
805 10 : fn new() -> Self {
806 10 : Self::default()
807 10 : }
808 1016236 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
809 1016236 : let mut buf = [0u8; PAGE_SZ];
810 1016236 : buf.copy_from_slice(&self.blocks[blknum as usize]);
811 1016236 : Ok(std::sync::Arc::new(buf).into())
812 1016236 : }
813 : }
814 : impl BlockReader for TestDisk {
815 411153 : fn block_cursor(&self) -> BlockCursor<'_> {
816 411153 : BlockCursor::new(BlockReaderRef::TestDisk(self))
817 411153 : }
818 : }
819 : impl BlockWriter for &mut TestDisk {
820 216 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
821 216 : let blknum = self.blocks.len();
822 216 : self.blocks.push(buf);
823 216 : Ok(blknum as u32)
824 216 : }
825 : }
826 :
827 : #[tokio::test]
828 2 : async fn basic() -> Result<()> {
829 2 : let mut disk = TestDisk::new();
830 2 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
831 2 :
832 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
833 2 :
834 2 : let all_keys: Vec<&[u8; 6]> = vec![
835 2 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
836 2 : ];
837 2 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
838 2 : .iter()
839 2 : .enumerate()
840 16 : .map(|(idx, key)| (*key, idx as u64))
841 2 : .collect();
842 16 : for (key, val) in all_data.iter() {
843 16 : writer.append(key, *val)?;
844 2 : }
845 2 :
846 2 : let (root_offset, _writer) = writer.finish()?;
847 2 :
848 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
849 2 :
850 2 : reader.dump().await?;
851 2 :
852 2 : // Test the `get` function on all the keys.
853 16 : for (key, val) in all_data.iter() {
854 16 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
855 2 : }
856 2 : // And on some keys that don't exist
857 2 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
858 2 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
859 2 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
860 2 :
861 2 : // Test search with `visit` function
862 2 : let search_key = b"xabaaa";
863 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
864 2 : .iter()
865 16 : .filter(|(key, _value)| key[..] >= search_key[..])
866 10 : .map(|(key, value)| (key.to_vec(), *value))
867 2 : .collect();
868 2 :
869 2 : let mut data = Vec::new();
870 2 : reader
871 2 : .visit(
872 2 : search_key,
873 2 : VisitDirection::Forwards,
874 10 : |key, value| {
875 10 : data.push((key.to_vec(), value));
876 10 : true
877 10 : },
878 2 : &ctx,
879 2 : )
880 2 : .await?;
881 2 : assert_eq!(data, expected);
882 2 :
883 2 : // Test a backwards scan
884 2 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
885 2 : .iter()
886 16 : .filter(|(key, _value)| key[..] <= search_key[..])
887 8 : .map(|(key, value)| (key.to_vec(), *value))
888 2 : .collect();
889 2 : expected.reverse();
890 2 : let mut data = Vec::new();
891 2 : reader
892 2 : .visit(
893 2 : search_key,
894 2 : VisitDirection::Backwards,
895 8 : |key, value| {
896 8 : data.push((key.to_vec(), value));
897 8 : true
898 8 : },
899 2 : &ctx,
900 2 : )
901 2 : .await?;
902 2 : assert_eq!(data, expected);
903 2 :
904 2 : // Backward scan where nothing matches
905 2 : reader
906 2 : .visit(
907 2 : b"aaaaaa",
908 2 : VisitDirection::Backwards,
909 2 : |key, value| {
910 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
911 2 : },
912 2 : &ctx,
913 2 : )
914 2 : .await?;
915 2 :
916 2 : // Full scan
917 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
918 2 : .iter()
919 16 : .map(|(key, value)| (key.to_vec(), *value))
920 2 : .collect();
921 2 : let mut data = Vec::new();
922 2 : reader
923 2 : .visit(
924 2 : &[0u8; 6],
925 2 : VisitDirection::Forwards,
926 16 : |key, value| {
927 16 : data.push((key.to_vec(), value));
928 16 : true
929 16 : },
930 2 : &ctx,
931 2 : )
932 2 : .await?;
933 2 : assert_eq!(data, expected);
934 2 :
935 2 : Ok(())
936 2 : }
937 :
938 : #[tokio::test]
939 2 : async fn lots_of_keys() -> Result<()> {
940 2 : let mut disk = TestDisk::new();
941 2 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
942 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
943 2 :
944 2 : const NUM_KEYS: u64 = 1000;
945 2 :
946 2 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
947 2 :
948 2002 : for idx in 0..NUM_KEYS {
949 2000 : let key_int: u64 = 1 + idx * 2;
950 2000 : let key = u64::to_be_bytes(key_int);
951 2000 : writer.append(&key, idx)?;
952 2 :
953 2000 : all_data.insert(key_int, idx);
954 2 : }
955 2 :
956 2 : let (root_offset, _writer) = writer.finish()?;
957 2 :
958 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
959 2 :
960 2 : reader.dump().await?;
961 2 :
962 2 : use std::sync::Mutex;
963 2 :
964 2 : let result = Mutex::new(Vec::new());
965 2 : let limit: AtomicUsize = AtomicUsize::new(10);
966 83820 : let take_ten = |key: &[u8], value: u64| {
967 83820 : let mut keybuf = [0u8; 8];
968 83820 : keybuf.copy_from_slice(key);
969 83820 : let key_int = u64::from_be_bytes(keybuf);
970 83820 :
971 83820 : let mut result = result.lock().unwrap();
972 83820 : result.push((key_int, value));
973 83820 :
974 83820 : // keep going until we have 10 matches
975 83820 : result.len() < limit.load(Ordering::Relaxed)
976 83820 : };
977 2 :
978 4020 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
979 4020 : let search_key = u64::to_be_bytes(search_key_int);
980 4020 : assert_eq!(
981 4020 : reader.get(&search_key, &ctx).await?,
982 4020 : all_data.get(&search_key_int).cloned()
983 2 : );
984 2 :
985 2 : // Test a forward scan starting with this key
986 4020 : result.lock().unwrap().clear();
987 4020 : reader
988 4020 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
989 2 : .await?;
990 4020 : let expected = all_data
991 4020 : .range(search_key_int..)
992 4020 : .take(10)
993 39820 : .map(|(&key, &val)| (key, val))
994 4020 : .collect::<Vec<(u64, u64)>>();
995 4020 : assert_eq!(*result.lock().unwrap(), expected);
996 2 :
997 2 : // And a backwards scan
998 4020 : result.lock().unwrap().clear();
999 4020 : reader
1000 4020 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1001 2 : .await?;
1002 4020 : let expected = all_data
1003 4020 : .range(..=search_key_int)
1004 4020 : .rev()
1005 4020 : .take(10)
1006 40000 : .map(|(&key, &val)| (key, val))
1007 4020 : .collect::<Vec<(u64, u64)>>();
1008 4020 : assert_eq!(*result.lock().unwrap(), expected);
1009 2 : }
1010 2 :
1011 2 : // full scan
1012 2 : let search_key = u64::to_be_bytes(0);
1013 2 : limit.store(usize::MAX, Ordering::Relaxed);
1014 2 : result.lock().unwrap().clear();
1015 2 : reader
1016 2 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1017 2 : .await?;
1018 2 : let expected = all_data
1019 2 : .iter()
1020 2000 : .map(|(&key, &val)| (key, val))
1021 2 : .collect::<Vec<(u64, u64)>>();
1022 2 : assert_eq!(*result.lock().unwrap(), expected);
1023 2 :
1024 2 : // full scan
1025 2 : let search_key = u64::to_be_bytes(u64::MAX);
1026 2 : limit.store(usize::MAX, Ordering::Relaxed);
1027 2 : result.lock().unwrap().clear();
1028 2 : reader
1029 2 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1030 2 : .await?;
1031 2 : let expected = all_data
1032 2 : .iter()
1033 2 : .rev()
1034 2000 : .map(|(&key, &val)| (key, val))
1035 2 : .collect::<Vec<(u64, u64)>>();
1036 2 : assert_eq!(*result.lock().unwrap(), expected);
1037 2 :
1038 2 : Ok(())
1039 2 : }
1040 :
1041 : #[tokio::test]
1042 2 : async fn random_data() -> Result<()> {
1043 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1044 2 :
1045 2 : // Generate random keys with exponential distribution, to
1046 2 : // exercise the prefix compression
1047 2 : const NUM_KEYS: usize = 100000;
1048 2 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
1049 200002 : for idx in 0..NUM_KEYS {
1050 200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
1051 200000 : let t = -(f64::ln(u));
1052 200000 : let key_int = (t * 1000000.0) as u128;
1053 200000 :
1054 200000 : all_data.insert(key_int, idx as u64);
1055 200000 : }
1056 2 :
1057 2 : // Build a tree from it
1058 2 : let mut disk = TestDisk::new();
1059 2 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
1060 2 :
1061 195043 : for (&key, &val) in all_data.iter() {
1062 195043 : writer.append(&u128::to_be_bytes(key), val)?;
1063 2 : }
1064 2 : let (root_offset, _writer) = writer.finish()?;
1065 2 :
1066 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1067 2 :
1068 2 : // Test get() operation on all the keys
1069 195043 : for (&key, &val) in all_data.iter() {
1070 195043 : let search_key = u128::to_be_bytes(key);
1071 195043 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
1072 2 : }
1073 2 :
1074 2 : // Test get() operations on random keys, most of which will not exist
1075 200002 : for _ in 0..100000 {
1076 200000 : let key_int = rand::thread_rng().gen::<u128>();
1077 200000 : let search_key = u128::to_be_bytes(key_int);
1078 200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
1079 2 : }
1080 2 :
1081 2 : // Test boundary cases
1082 2 : assert!(
1083 2 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
1084 2 : == all_data.get(&u128::MIN).cloned()
1085 2 : );
1086 2 : assert!(
1087 2 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
1088 2 : == all_data.get(&u128::MAX).cloned()
1089 2 : );
1090 2 :
1091 2 : Ok(())
1092 2 : }
1093 :
1094 : #[test]
1095 2 : fn unsorted_input() {
1096 2 : let mut disk = TestDisk::new();
1097 2 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1098 2 :
1099 2 : let _ = writer.append(b"ba", 1);
1100 2 : let _ = writer.append(b"bb", 2);
1101 2 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1102 2 : match err {
1103 2 : DiskBtreeError::UnsortedInput { key, last_key } => {
1104 2 : assert_eq!(key.as_ref(), b"aa".as_slice());
1105 2 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1106 : }
1107 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1108 : }
1109 2 : }
1110 :
1111 : ///
1112 : /// This test contains a particular data set, see disk_btree_test_data.rs
1113 : ///
1114 : #[tokio::test]
1115 2 : async fn particular_data() -> Result<()> {
1116 2 : // Build a tree from it
1117 2 : let mut disk = TestDisk::new();
1118 2 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1119 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1120 2 :
1121 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1122 4000 : writer.append(&key, val)?;
1123 2 : }
1124 2 : let (root_offset, writer) = writer.finish()?;
1125 2 :
1126 2 : println!("SIZE: {} blocks", writer.blocks.len());
1127 2 :
1128 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1129 2 :
1130 2 : // Test get() operation on all the keys
1131 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1132 4000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1133 2 : }
1134 2 :
1135 2 : // Test full scan
1136 2 : let mut count = 0;
1137 2 : reader
1138 2 : .visit(
1139 2 : &[0u8; 26],
1140 2 : VisitDirection::Forwards,
1141 4000 : |_key, _value| {
1142 4000 : count += 1;
1143 4000 : true
1144 4000 : },
1145 2 : &ctx,
1146 2 : )
1147 2 : .await?;
1148 2 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1149 2 :
1150 2 : reader.dump().await?;
1151 2 :
1152 2 : Ok(())
1153 2 : }
1154 : }
1155 :
1156 : #[cfg(test)]
1157 : #[path = "disk_btree_test_data.rs"]
1158 : mod disk_btree_test_data;
|