Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use byteorder::{ReadBytesExt, BE};
22 : use bytes::{BufMut, Bytes, BytesMut};
23 : use either::Either;
24 : use hex;
25 : use std::{cmp::Ordering, io, result};
26 : use thiserror::Error;
27 : use tracing::error;
28 :
29 : use crate::{
30 : context::{DownloadBehavior, RequestContext},
31 : task_mgr::TaskKind,
32 : tenant::block_io::{BlockReader, BlockWriter},
33 : };
34 :
35 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
36 : pub const VALUE_SZ: usize = 5;
37 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
38 :
39 : pub const PAGE_SZ: usize = 8192;
40 :
41 0 : #[derive(Clone, Copy, Debug)]
42 : struct Value([u8; VALUE_SZ]);
43 :
44 : impl Value {
45 3039839 : fn from_slice(slice: &[u8]) -> Value {
46 3039839 : let mut b = [0u8; VALUE_SZ];
47 3039839 : b.copy_from_slice(slice);
48 3039839 : Value(b)
49 3039839 : }
50 :
51 4513257 : fn from_u64(x: u64) -> Value {
52 4513257 : assert!(x <= 0x007f_ffff_ffff);
53 4513257 : Value([
54 4513257 : (x >> 32) as u8,
55 4513257 : (x >> 24) as u8,
56 4513257 : (x >> 16) as u8,
57 4513257 : (x >> 8) as u8,
58 4513257 : x as u8,
59 4513257 : ])
60 4513257 : }
61 :
62 8365 : fn from_blknum(x: u32) -> Value {
63 8365 : Value([
64 8365 : 0x80,
65 8365 : (x >> 24) as u8,
66 8365 : (x >> 16) as u8,
67 8365 : (x >> 8) as u8,
68 8365 : x as u8,
69 8365 : ])
70 8365 : }
71 :
72 : #[allow(dead_code)]
73 0 : fn is_offset(self) -> bool {
74 0 : self.0[0] & 0x80 != 0
75 0 : }
76 :
77 2512246 : fn to_u64(self) -> u64 {
78 2512246 : let b = &self.0;
79 2512246 : (b[0] as u64) << 32
80 2512246 : | (b[1] as u64) << 24
81 2512246 : | (b[2] as u64) << 16
82 2512246 : | (b[3] as u64) << 8
83 2512246 : | b[4] as u64
84 2512246 : }
85 :
86 521569 : fn to_blknum(self) -> u32 {
87 521569 : let b = &self.0;
88 521569 : assert!(b[0] == 0x80);
89 521569 : (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
90 521569 : }
91 : }
92 :
93 0 : #[derive(Error, Debug)]
94 : pub enum DiskBtreeError {
95 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
96 : AppendOverflow(u64),
97 :
98 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
99 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
100 :
101 : #[error("Could not push to new leaf node")]
102 : FailedToPushToNewLeafNode,
103 :
104 : #[error("IoError: {0}")]
105 : Io(#[from] io::Error),
106 : }
107 :
108 : pub type Result<T> = result::Result<T, DiskBtreeError>;
109 :
110 : /// This is the on-disk representation.
111 : struct OnDiskNode<'a, const L: usize> {
112 : // Fixed-width fields
113 : num_children: u16,
114 : level: u8,
115 : prefix_len: u8,
116 : suffix_len: u8,
117 :
118 : // Variable-length fields. These are stored on-disk after the fixed-width
119 : // fields, in this order. In the in-memory representation, these point to
120 : // the right parts in the page buffer.
121 : prefix: &'a [u8],
122 : keys: &'a [u8],
123 : values: &'a [u8],
124 : }
125 :
126 : impl<'a, const L: usize> OnDiskNode<'a, L> {
127 : ///
128 : /// Interpret a PAGE_SZ page as a node.
129 : ///
130 1267259 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
131 1267259 : let mut cursor = std::io::Cursor::new(buf);
132 1267259 : let num_children = cursor.read_u16::<BE>()?;
133 1267259 : let level = cursor.read_u8()?;
134 1267259 : let prefix_len = cursor.read_u8()?;
135 1267259 : let suffix_len = cursor.read_u8()?;
136 :
137 1267259 : let mut off = cursor.position();
138 1267259 : let prefix_off = off as usize;
139 1267259 : off += prefix_len as u64;
140 1267259 :
141 1267259 : let keys_off = off as usize;
142 1267259 : let keys_len = num_children as usize * suffix_len as usize;
143 1267259 : off += keys_len as u64;
144 1267259 :
145 1267259 : let values_off = off as usize;
146 1267259 : let values_len = num_children as usize * VALUE_SZ;
147 1267259 : //off += values_len as u64;
148 1267259 :
149 1267259 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
150 1267259 : let keys = &buf[keys_off..keys_off + keys_len];
151 1267259 : let values = &buf[values_off..values_off + values_len];
152 1267259 :
153 1267259 : Ok(OnDiskNode {
154 1267259 : num_children,
155 1267259 : level,
156 1267259 : prefix_len,
157 1267259 : suffix_len,
158 1267259 : prefix,
159 1267259 : keys,
160 1267259 : values,
161 1267259 : })
162 1267259 : }
163 :
164 : ///
165 : /// Read a value at 'idx'
166 : ///
167 3039839 : fn value(&self, idx: usize) -> Value {
168 3039839 : let value_off = idx * VALUE_SZ;
169 3039839 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
170 3039839 : Value::from_slice(value_slice)
171 3039839 : }
172 :
173 1057182 : fn binary_search(
174 1057182 : &self,
175 1057182 : search_key: &[u8; L],
176 1057182 : keybuf: &mut [u8],
177 1057182 : ) -> result::Result<usize, usize> {
178 1057182 : let mut size = self.num_children as usize;
179 1057182 : let mut low = 0;
180 1057182 : let mut high = size;
181 8193238 : while low < high {
182 7342093 : let mid = low + size / 2;
183 7342093 :
184 7342093 : let key_off = mid * self.suffix_len as usize;
185 7342093 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
186 7342093 : // Does this match?
187 7342093 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
188 7342093 :
189 7342093 : let cmp = keybuf[..].cmp(search_key);
190 7342093 :
191 7342093 : if cmp == Ordering::Less {
192 4896110 : low = mid + 1;
193 4896110 : } else if cmp == Ordering::Greater {
194 2239946 : high = mid;
195 2239946 : } else {
196 206037 : return Ok(mid);
197 : }
198 7136056 : size = high - low;
199 : }
200 851145 : Err(low)
201 1057182 : }
202 : }
203 :
204 : ///
205 : /// Public reader object, to search the tree.
206 : ///
207 : pub struct DiskBtreeReader<R, const L: usize>
208 : where
209 : R: BlockReader,
210 : {
211 : start_blk: u32,
212 : root_blk: u32,
213 : reader: R,
214 : }
215 :
216 1057182 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
217 : pub enum VisitDirection {
218 : Forwards,
219 : Backwards,
220 : }
221 :
222 : impl<R, const L: usize> DiskBtreeReader<R, L>
223 : where
224 : R: BlockReader,
225 : {
226 124354 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
227 124354 : DiskBtreeReader {
228 124354 : start_blk,
229 124354 : root_blk,
230 124354 : reader,
231 124354 : }
232 124354 : }
233 :
234 : ///
235 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
236 : ///
237 403737 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
238 403737 : let mut result: Option<u64> = None;
239 403737 : self.visit(
240 403737 : search_key,
241 403737 : VisitDirection::Forwards,
242 403737 : |key, value| {
243 203713 : if key == search_key {
244 201703 : result = Some(value);
245 201703 : }
246 203713 : false
247 403737 : },
248 403737 : ctx,
249 403737 : )
250 287 : .await?;
251 403737 : Ok(result)
252 403737 : }
253 :
254 : ///
255 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
256 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
257 : /// backwards)
258 : ///
259 535621 : pub async fn visit<V>(
260 535621 : &self,
261 535621 : search_key: &[u8; L],
262 535621 : dir: VisitDirection,
263 535621 : mut visitor: V,
264 535621 : ctx: &RequestContext,
265 535621 : ) -> Result<bool>
266 535621 : where
267 535621 : V: FnMut(&[u8], u64) -> bool,
268 535621 : {
269 535621 : let mut stack = Vec::new();
270 535621 : stack.push((self.root_blk, None));
271 535621 : let block_cursor = self.reader.block_cursor();
272 1461659 : while let Some((node_blknum, opt_iter)) = stack.pop() {
273 : // Locate the node.
274 1261227 : let node_buf = block_cursor
275 1261227 : .read_blk(self.start_blk + node_blknum, ctx)
276 19665 : .await?;
277 :
278 1261227 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
279 1261227 : let prefix_len = node.prefix_len as usize;
280 1261227 : let suffix_len = node.suffix_len as usize;
281 1261227 :
282 1261227 : assert!(node.num_children > 0);
283 :
284 1261227 : let mut keybuf = Vec::new();
285 1261227 : keybuf.extend(node.prefix);
286 1261227 : keybuf.resize(prefix_len + suffix_len, 0);
287 :
288 1261227 : let mut iter = if let Some(iter) = opt_iter {
289 204045 : iter
290 1057182 : } else if dir == VisitDirection::Forwards {
291 : // Locate the first match
292 811315 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
293 203911 : Ok(idx) => idx,
294 607404 : Err(idx) => {
295 607404 : if node.level == 0 {
296 : // Imagine that the node contains the following keys:
297 : //
298 : // 1
299 : // 3 <-- idx
300 : // 5
301 : //
302 : // If the search key is '2' and there is exact match,
303 : // the binary search would return the index of key
304 : // '3'. That's cool, '3' is the first key to return.
305 208115 : idx
306 : } else {
307 : // This is an internal page, so each key represents a lower
308 : // bound for what's in the child page. If there is no exact
309 : // match, we have to return the *previous* entry.
310 : //
311 : // 1 <-- return this
312 : // 3 <-- idx
313 : // 5
314 399289 : idx.saturating_sub(1)
315 : }
316 : }
317 : };
318 811315 : Either::Left(idx..node.num_children.into())
319 : } else {
320 245867 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
321 2126 : Ok(idx) => {
322 2126 : // Exact match. That's the first entry to return, and walk
323 2126 : // backwards from there.
324 2126 : idx
325 : }
326 243741 : Err(idx) => {
327 : // No exact match. The binary search returned the index of the
328 : // first key that's > search_key. Back off by one, and walk
329 : // backwards from there.
330 243741 : if let Some(idx) = idx.checked_sub(1) {
331 238562 : idx
332 : } else {
333 5179 : return Ok(false);
334 : }
335 : }
336 : };
337 240688 : Either::Right((0..=idx).rev())
338 : };
339 :
340 : // idx points to the first match now. Keep going from there
341 3438284 : while let Some(idx) = iter.next() {
342 3033807 : let key_off = idx * suffix_len;
343 3033807 : let suffix = &node.keys[key_off..key_off + suffix_len];
344 3033807 : keybuf[prefix_len..].copy_from_slice(suffix);
345 3033807 : let value = node.value(idx);
346 3033807 : #[allow(clippy::collapsible_if)]
347 3033807 : if node.level == 0 {
348 : // leaf
349 2512246 : if !visitor(&keybuf, value.to_u64()) {
350 330010 : return Ok(false);
351 2182236 : }
352 : } else {
353 521561 : stack.push((node_blknum, Some(iter)));
354 521561 : stack.push((value.to_blknum(), None));
355 521561 : break;
356 : }
357 : }
358 : }
359 200432 : Ok(true)
360 535621 : }
361 :
362 : #[allow(dead_code)]
363 10 : pub async fn dump(&self) -> Result<()> {
364 10 : let mut stack = Vec::new();
365 10 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
366 10 :
367 10 : stack.push((self.root_blk, String::new(), 0, 0, 0));
368 10 :
369 10 : let block_cursor = self.reader.block_cursor();
370 :
371 6042 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
372 6032 : let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
373 6032 : let buf: &[u8] = blk.as_ref();
374 6032 : let node = OnDiskNode::<L>::deparse(buf)?;
375 :
376 6032 : if child_idx == 0 {
377 18 : print!("{:indent$}", "", indent = depth * 2);
378 18 : let path_prefix = stack
379 18 : .iter()
380 18 : .map(|(_blknum, path, ..)| path.as_str())
381 18 : .collect::<String>();
382 18 : println!(
383 18 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
384 18 : hex::encode(node.prefix),
385 18 : node.suffix_len
386 18 : );
387 6014 : }
388 :
389 6032 : if child_idx + 1 < node.num_children {
390 6014 : let key_off = key_off + node.suffix_len as usize;
391 6014 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
392 6014 : }
393 6032 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
394 6032 : let val = node.value(child_idx as usize);
395 6032 :
396 6032 : print!("{:indent$}", "", indent = depth * 2 + 2);
397 6032 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
398 6032 :
399 6032 : if node.level > 0 {
400 8 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
401 6024 : }
402 : }
403 10 : Ok(())
404 10 : }
405 : }
406 :
407 : ///
408 : /// Public builder object, for creating a new tree.
409 : ///
410 : /// Usage: Create a builder object by calling 'new', load all the data into the
411 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
412 : ///
413 : /// 'L' is the key length in bytes
414 : pub struct DiskBtreeBuilder<W, const L: usize>
415 : where
416 : W: BlockWriter,
417 : {
418 : writer: W,
419 :
420 : ///
421 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
422 : ///
423 : /// We maintain the length of the stack to be always greater than zero.
424 : /// Two exceptions are:
425 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
426 : /// So because other methods cannot see the intermediate state invariant still holds.
427 : /// 2. `Self::finish`. It consumes self and does not return it back,
428 : /// which means that this is where the structure is destroyed.
429 : /// Thus stack of zero length cannot be observed by other methods.
430 : stack: Vec<BuildNode<L>>,
431 :
432 : /// Last key that was appended to the tree. Used to sanity check that append
433 : /// is called in increasing key order.
434 : last_key: Option<[u8; L]>,
435 : }
436 :
437 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
438 : where
439 : W: BlockWriter,
440 : {
441 564 : pub fn new(writer: W) -> Self {
442 564 : DiskBtreeBuilder {
443 564 : writer,
444 564 : last_key: None,
445 564 : stack: vec![BuildNode::new(0)],
446 564 : }
447 564 : }
448 :
449 4513259 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
450 4513259 : if value > MAX_VALUE {
451 0 : return Err(DiskBtreeError::AppendOverflow(value));
452 4513259 : }
453 4513259 : if let Some(last_key) = &self.last_key {
454 4512695 : if key <= last_key {
455 2 : return Err(DiskBtreeError::UnsortedInput {
456 2 : key: key.as_slice().into(),
457 2 : last_key: last_key.as_slice().into(),
458 2 : });
459 4512693 : }
460 564 : }
461 4513257 : self.last_key = Some(*key);
462 4513257 :
463 4513257 : self.append_internal(key, Value::from_u64(value))
464 4513259 : }
465 :
466 4521622 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
467 4521622 : // Try to append to the current leaf buffer
468 4521622 : let last = self
469 4521622 : .stack
470 4521622 : .last_mut()
471 4521622 : .expect("should always have at least one item");
472 4521622 : let level = last.level;
473 4521622 : if last.push(key, value) {
474 4505602 : return Ok(());
475 16020 : }
476 16020 :
477 16020 : // It did not fit. Try to compress, and if it succeeds to make
478 16020 : // some room on the node, try appending to it again.
479 16020 : #[allow(clippy::collapsible_if)]
480 16020 : if last.compress() {
481 8099 : if last.push(key, value) {
482 8091 : return Ok(());
483 8 : }
484 7921 : }
485 :
486 : // Could not append to the current leaf. Flush it and create a new one.
487 7929 : self.flush_node()?;
488 :
489 : // Replace the node we flushed with an empty one and append the new
490 : // key to it.
491 7929 : let mut last = BuildNode::new(level);
492 7929 : if !last.push(key, value) {
493 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
494 7929 : }
495 7929 :
496 7929 : self.stack.push(last);
497 7929 :
498 7929 : Ok(())
499 4521622 : }
500 :
501 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
502 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
503 : /// creates a new root page, increasing the height of the tree.
504 8365 : fn flush_node(&mut self) -> Result<()> {
505 8365 : // Get the current bottommost node in the stack and flush it to disk.
506 8365 : let last = self
507 8365 : .stack
508 8365 : .pop()
509 8365 : .expect("should always have at least one item");
510 8365 : let buf = last.pack();
511 8365 : let downlink_key = last.first_key();
512 8365 : let downlink_ptr = self.writer.write_blk(buf)?;
513 :
514 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
515 : // create a new root page, increasing the height of the tree.
516 8365 : if self.stack.is_empty() {
517 436 : self.stack.push(BuildNode::new(last.level + 1));
518 7929 : }
519 8365 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
520 8365 : }
521 :
522 : ///
523 : /// Flushes everything to disk, and returns the block number of the root page.
524 : /// The caller must store the root block number "out-of-band", and pass it
525 : /// to the DiskBtreeReader::new() when you want to read the tree again.
526 : /// (In the image and delta layers, it is stored in the beginning of the file,
527 : /// in the summary header)
528 : ///
529 562 : pub fn finish(mut self) -> Result<(u32, W)> {
530 : // flush all levels, except the root.
531 998 : while self.stack.len() > 1 {
532 436 : self.flush_node()?;
533 : }
534 :
535 562 : let root = self
536 562 : .stack
537 562 : .first()
538 562 : .expect("by the check above we left one item there");
539 562 : let buf = root.pack();
540 562 : let root_blknum = self.writer.write_blk(buf)?;
541 :
542 562 : Ok((root_blknum, self.writer))
543 562 : }
544 :
545 2009970 : pub fn borrow_writer(&self) -> &W {
546 2009970 : &self.writer
547 2009970 : }
548 : }
549 :
550 : ///
551 : /// BuildNode represesnts an incomplete page that we are appending to.
552 : ///
553 0 : #[derive(Clone, Debug)]
554 : struct BuildNode<const L: usize> {
555 : num_children: u16,
556 : level: u8,
557 : prefix: Vec<u8>,
558 : suffix_len: usize,
559 :
560 : keys: Vec<u8>,
561 : values: Vec<u8>,
562 :
563 : size: usize, // physical size of this node, if it was written to disk like this
564 : }
565 :
566 : const NODE_SIZE: usize = PAGE_SZ;
567 :
568 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
569 :
570 : impl<const L: usize> BuildNode<L> {
571 8929 : fn new(level: u8) -> Self {
572 8929 : BuildNode {
573 8929 : num_children: 0,
574 8929 : level,
575 8929 : prefix: Vec::new(),
576 8929 : suffix_len: 0,
577 8929 : keys: Vec::new(),
578 8929 : values: Vec::new(),
579 8929 : size: NODE_HDR_SIZE,
580 8929 : }
581 8929 : }
582 :
583 : /// Try to append a key-value pair to this node. Returns 'true' on
584 : /// success, 'false' if the page was full or the key was
585 : /// incompatible with the prefix of the existing keys.
586 4537650 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
587 4537650 : // If we have already performed prefix-compression on the page,
588 4537650 : // check that the incoming key has the same prefix.
589 4537650 : if self.num_children > 0 {
590 : // does the prefix allow it?
591 4528721 : if !key.starts_with(&self.prefix) {
592 195 : return false;
593 4528526 : }
594 8929 : } else {
595 8929 : self.suffix_len = key.len();
596 8929 : }
597 :
598 : // Is the node too full?
599 4537455 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
600 15833 : return false;
601 4521622 : }
602 4521622 :
603 4521622 : // All clear
604 4521622 : self.num_children += 1;
605 4521622 : self.keys.extend(&key[self.prefix.len()..]);
606 4521622 : self.values.extend(value.0);
607 4521622 :
608 4521622 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
609 4521622 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
610 :
611 4521622 : self.size += self.suffix_len + VALUE_SZ;
612 4521622 :
613 4521622 : true
614 4537650 : }
615 :
616 : ///
617 : /// Perform prefix-compression.
618 : ///
619 : /// Returns 'true' on success, 'false' if no compression was possible.
620 : ///
621 16020 : fn compress(&mut self) -> bool {
622 16020 : let first_suffix = self.first_suffix();
623 16020 : let last_suffix = self.last_suffix();
624 16020 :
625 16020 : // Find the common prefix among all keys
626 16020 : let mut prefix_len = 0;
627 145275 : while prefix_len < self.suffix_len {
628 145275 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
629 16020 : break;
630 129255 : }
631 129255 : prefix_len += 1;
632 : }
633 16020 : if prefix_len == 0 {
634 7921 : return false;
635 8099 : }
636 8099 :
637 8099 : // Can compress. Rewrite the keys without the common prefix.
638 8099 : self.prefix.extend(&self.keys[..prefix_len]);
639 8099 :
640 8099 : let mut new_keys = Vec::new();
641 8099 : let mut key_off = 0;
642 2171590 : while key_off < self.keys.len() {
643 2163491 : let next_key_off = key_off + self.suffix_len;
644 2163491 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
645 2163491 : key_off = next_key_off;
646 2163491 : }
647 8099 : self.keys = new_keys;
648 8099 : self.suffix_len -= prefix_len;
649 8099 :
650 8099 : self.size -= prefix_len * self.num_children as usize;
651 8099 : self.size += prefix_len;
652 8099 :
653 8099 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
654 8099 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
655 :
656 8099 : true
657 16020 : }
658 :
659 : ///
660 : /// Serialize the node to on-disk format.
661 : ///
662 8927 : fn pack(&self) -> Bytes {
663 8927 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
664 8927 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
665 8927 : assert!(self.num_children > 0);
666 :
667 8927 : let mut buf = BytesMut::new();
668 8927 :
669 8927 : buf.put_u16(self.num_children);
670 8927 : buf.put_u8(self.level);
671 8927 : buf.put_u8(self.prefix.len() as u8);
672 8927 : buf.put_u8(self.suffix_len as u8);
673 8927 : buf.put(&self.prefix[..]);
674 8927 : buf.put(&self.keys[..]);
675 8927 : buf.put(&self.values[..]);
676 8927 :
677 8927 : assert!(buf.len() == self.size);
678 :
679 8927 : assert!(buf.len() <= PAGE_SZ);
680 8927 : buf.resize(PAGE_SZ, 0);
681 8927 : buf.freeze()
682 8927 : }
683 :
684 24385 : fn first_suffix(&self) -> &[u8] {
685 24385 : &self.keys[..self.suffix_len]
686 24385 : }
687 16020 : fn last_suffix(&self) -> &[u8] {
688 16020 : &self.keys[self.keys.len() - self.suffix_len..]
689 16020 : }
690 :
691 : /// Return the full first key of the page, including the prefix
692 8365 : fn first_key(&self) -> [u8; L] {
693 8365 : let mut key = [0u8; L];
694 8365 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
695 8365 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
696 8365 : key
697 8365 : }
698 : }
699 :
700 : #[cfg(test)]
701 : pub(crate) mod tests {
702 : use super::*;
703 : use crate::context::DownloadBehavior;
704 : use crate::task_mgr::TaskKind;
705 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
706 : use rand::Rng;
707 : use std::collections::BTreeMap;
708 : use std::sync::atomic::{AtomicUsize, Ordering};
709 :
710 10 : #[derive(Clone, Default)]
711 : pub(crate) struct TestDisk {
712 : blocks: Vec<Bytes>,
713 : }
714 : impl TestDisk {
715 10 : fn new() -> Self {
716 10 : Self::default()
717 10 : }
718 1016504 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
719 1016504 : let mut buf = [0u8; PAGE_SZ];
720 1016504 : buf.copy_from_slice(&self.blocks[blknum as usize]);
721 1016504 : Ok(std::sync::Arc::new(buf).into())
722 1016504 : }
723 : }
724 : impl BlockReader for TestDisk {
725 411285 : fn block_cursor(&self) -> BlockCursor<'_> {
726 411285 : BlockCursor::new(BlockReaderRef::TestDisk(self))
727 411285 : }
728 : }
729 : impl BlockWriter for &mut TestDisk {
730 214 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
731 214 : let blknum = self.blocks.len();
732 214 : self.blocks.push(buf);
733 214 : Ok(blknum as u32)
734 214 : }
735 : }
736 :
737 2 : #[tokio::test]
738 2 : async fn basic() -> Result<()> {
739 2 : let mut disk = TestDisk::new();
740 2 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
741 2 :
742 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
743 2 :
744 2 : let all_keys: Vec<&[u8; 6]> = vec![
745 2 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
746 2 : ];
747 2 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
748 2 : .iter()
749 2 : .enumerate()
750 16 : .map(|(idx, key)| (*key, idx as u64))
751 2 : .collect();
752 16 : for (key, val) in all_data.iter() {
753 16 : writer.append(key, *val)?;
754 2 : }
755 2 :
756 2 : let (root_offset, _writer) = writer.finish()?;
757 2 :
758 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
759 2 :
760 2 : reader.dump().await?;
761 2 :
762 2 : // Test the `get` function on all the keys.
763 16 : for (key, val) in all_data.iter() {
764 16 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
765 2 : }
766 2 : // And on some keys that don't exist
767 2 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
768 2 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
769 2 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
770 2 :
771 2 : // Test search with `visit` function
772 2 : let search_key = b"xabaaa";
773 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
774 2 : .iter()
775 16 : .filter(|(key, _value)| key[..] >= search_key[..])
776 10 : .map(|(key, value)| (key.to_vec(), *value))
777 2 : .collect();
778 2 :
779 2 : let mut data = Vec::new();
780 2 : reader
781 2 : .visit(
782 2 : search_key,
783 2 : VisitDirection::Forwards,
784 10 : |key, value| {
785 10 : data.push((key.to_vec(), value));
786 10 : true
787 10 : },
788 2 : &ctx,
789 2 : )
790 2 : .await?;
791 2 : assert_eq!(data, expected);
792 2 :
793 2 : // Test a backwards scan
794 2 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
795 2 : .iter()
796 16 : .filter(|(key, _value)| key[..] <= search_key[..])
797 8 : .map(|(key, value)| (key.to_vec(), *value))
798 2 : .collect();
799 2 : expected.reverse();
800 2 : let mut data = Vec::new();
801 2 : reader
802 2 : .visit(
803 2 : search_key,
804 2 : VisitDirection::Backwards,
805 8 : |key, value| {
806 8 : data.push((key.to_vec(), value));
807 8 : true
808 8 : },
809 2 : &ctx,
810 2 : )
811 2 : .await?;
812 2 : assert_eq!(data, expected);
813 2 :
814 2 : // Backward scan where nothing matches
815 2 : reader
816 2 : .visit(
817 2 : b"aaaaaa",
818 2 : VisitDirection::Backwards,
819 2 : |key, value| {
820 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
821 2 : },
822 2 : &ctx,
823 2 : )
824 2 : .await?;
825 2 :
826 2 : // Full scan
827 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
828 2 : .iter()
829 16 : .map(|(key, value)| (key.to_vec(), *value))
830 2 : .collect();
831 2 : let mut data = Vec::new();
832 2 : reader
833 2 : .visit(
834 2 : &[0u8; 6],
835 2 : VisitDirection::Forwards,
836 16 : |key, value| {
837 16 : data.push((key.to_vec(), value));
838 16 : true
839 16 : },
840 2 : &ctx,
841 2 : )
842 2 : .await?;
843 2 : assert_eq!(data, expected);
844 2 :
845 2 : Ok(())
846 2 : }
847 :
848 2 : #[tokio::test]
849 2 : async fn lots_of_keys() -> Result<()> {
850 2 : let mut disk = TestDisk::new();
851 2 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
852 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
853 2 :
854 2 : const NUM_KEYS: u64 = 1000;
855 2 :
856 2 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
857 2 :
858 2002 : for idx in 0..NUM_KEYS {
859 2000 : let key_int: u64 = 1 + idx * 2;
860 2000 : let key = u64::to_be_bytes(key_int);
861 2000 : writer.append(&key, idx)?;
862 2 :
863 2000 : all_data.insert(key_int, idx);
864 2 : }
865 2 :
866 2 : let (root_offset, _writer) = writer.finish()?;
867 2 :
868 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
869 2 :
870 2 : reader.dump().await?;
871 2 :
872 2 : use std::sync::Mutex;
873 2 :
874 2 : let result = Mutex::new(Vec::new());
875 2 : let limit: AtomicUsize = AtomicUsize::new(10);
876 83820 : let take_ten = |key: &[u8], value: u64| {
877 83820 : let mut keybuf = [0u8; 8];
878 83820 : keybuf.copy_from_slice(key);
879 83820 : let key_int = u64::from_be_bytes(keybuf);
880 83820 :
881 83820 : let mut result = result.lock().unwrap();
882 83820 : result.push((key_int, value));
883 83820 :
884 83820 : // keep going until we have 10 matches
885 83820 : result.len() < limit.load(Ordering::Relaxed)
886 83820 : };
887 2 :
888 4020 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
889 4020 : let search_key = u64::to_be_bytes(search_key_int);
890 4020 : assert_eq!(
891 4020 : reader.get(&search_key, &ctx).await?,
892 4020 : all_data.get(&search_key_int).cloned()
893 2 : );
894 2 :
895 2 : // Test a forward scan starting with this key
896 4020 : result.lock().unwrap().clear();
897 4020 : reader
898 4020 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
899 2 : .await?;
900 4020 : let expected = all_data
901 4020 : .range(search_key_int..)
902 4020 : .take(10)
903 39820 : .map(|(&key, &val)| (key, val))
904 4020 : .collect::<Vec<(u64, u64)>>();
905 4020 : assert_eq!(*result.lock().unwrap(), expected);
906 2 :
907 2 : // And a backwards scan
908 4020 : result.lock().unwrap().clear();
909 4020 : reader
910 4020 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
911 2 : .await?;
912 4020 : let expected = all_data
913 4020 : .range(..=search_key_int)
914 4020 : .rev()
915 4020 : .take(10)
916 40000 : .map(|(&key, &val)| (key, val))
917 4020 : .collect::<Vec<(u64, u64)>>();
918 4020 : assert_eq!(*result.lock().unwrap(), expected);
919 2 : }
920 2 :
921 2 : // full scan
922 2 : let search_key = u64::to_be_bytes(0);
923 2 : limit.store(usize::MAX, Ordering::Relaxed);
924 2 : result.lock().unwrap().clear();
925 2 : reader
926 2 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
927 2 : .await?;
928 2 : let expected = all_data
929 2 : .iter()
930 2000 : .map(|(&key, &val)| (key, val))
931 2 : .collect::<Vec<(u64, u64)>>();
932 2 : assert_eq!(*result.lock().unwrap(), expected);
933 2 :
934 2 : // full scan
935 2 : let search_key = u64::to_be_bytes(u64::MAX);
936 2 : limit.store(usize::MAX, Ordering::Relaxed);
937 2 : result.lock().unwrap().clear();
938 2 : reader
939 2 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
940 2 : .await?;
941 2 : let expected = all_data
942 2 : .iter()
943 2 : .rev()
944 2000 : .map(|(&key, &val)| (key, val))
945 2 : .collect::<Vec<(u64, u64)>>();
946 2 : assert_eq!(*result.lock().unwrap(), expected);
947 2 :
948 2 : Ok(())
949 2 : }
950 :
951 2 : #[tokio::test]
952 2 : async fn random_data() -> Result<()> {
953 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
954 2 :
955 2 : // Generate random keys with exponential distribution, to
956 2 : // exercise the prefix compression
957 2 : const NUM_KEYS: usize = 100000;
958 2 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
959 200002 : for idx in 0..NUM_KEYS {
960 200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
961 200000 : let t = -(f64::ln(u));
962 200000 : let key_int = (t * 1000000.0) as u128;
963 200000 :
964 200000 : all_data.insert(key_int, idx as u64);
965 200000 : }
966 2 :
967 2 : // Build a tree from it
968 2 : let mut disk = TestDisk::new();
969 2 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
970 2 :
971 195179 : for (&key, &val) in all_data.iter() {
972 195179 : writer.append(&u128::to_be_bytes(key), val)?;
973 2 : }
974 2 : let (root_offset, _writer) = writer.finish()?;
975 2 :
976 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
977 2 :
978 2 : // Test get() operation on all the keys
979 195179 : for (&key, &val) in all_data.iter() {
980 195179 : let search_key = u128::to_be_bytes(key);
981 195179 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
982 2 : }
983 2 :
984 2 : // Test get() operations on random keys, most of which will not exist
985 200002 : for _ in 0..100000 {
986 200000 : let key_int = rand::thread_rng().gen::<u128>();
987 200000 : let search_key = u128::to_be_bytes(key_int);
988 200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
989 2 : }
990 2 :
991 2 : // Test boundary cases
992 2 : assert!(
993 2 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
994 2 : == all_data.get(&u128::MIN).cloned()
995 2 : );
996 2 : assert!(
997 2 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
998 2 : == all_data.get(&u128::MAX).cloned()
999 2 : );
1000 2 :
1001 2 : Ok(())
1002 2 : }
1003 :
1004 2 : #[test]
1005 2 : fn unsorted_input() {
1006 2 : let mut disk = TestDisk::new();
1007 2 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1008 2 :
1009 2 : let _ = writer.append(b"ba", 1);
1010 2 : let _ = writer.append(b"bb", 2);
1011 2 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1012 2 : match err {
1013 2 : DiskBtreeError::UnsortedInput { key, last_key } => {
1014 2 : assert_eq!(key.as_ref(), b"aa".as_slice());
1015 2 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1016 : }
1017 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1018 : }
1019 2 : }
1020 :
1021 : ///
1022 : /// This test contains a particular data set, see disk_btree_test_data.rs
1023 : ///
1024 2 : #[tokio::test]
1025 2 : async fn particular_data() -> Result<()> {
1026 2 : // Build a tree from it
1027 2 : let mut disk = TestDisk::new();
1028 2 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1029 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1030 2 :
1031 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1032 4000 : writer.append(&key, val)?;
1033 2 : }
1034 2 : let (root_offset, writer) = writer.finish()?;
1035 2 :
1036 2 : println!("SIZE: {} blocks", writer.blocks.len());
1037 2 :
1038 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1039 2 :
1040 2 : // Test get() operation on all the keys
1041 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1042 4000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1043 2 : }
1044 2 :
1045 2 : // Test full scan
1046 2 : let mut count = 0;
1047 2 : reader
1048 2 : .visit(
1049 2 : &[0u8; 26],
1050 2 : VisitDirection::Forwards,
1051 4000 : |_key, _value| {
1052 4000 : count += 1;
1053 4000 : true
1054 4000 : },
1055 2 : &ctx,
1056 2 : )
1057 2 : .await?;
1058 2 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1059 2 :
1060 2 : reader.dump().await?;
1061 2 :
1062 2 : Ok(())
1063 2 : }
1064 : }
1065 :
1066 : #[cfg(test)]
1067 : #[path = "disk_btree_test_data.rs"]
1068 : mod disk_btree_test_data;
|