Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use async_stream::try_stream;
22 : use byteorder::{ReadBytesExt, BE};
23 : use bytes::{BufMut, Bytes, BytesMut};
24 : use either::Either;
25 : use futures::{Stream, StreamExt};
26 : use hex;
27 : use std::{
28 : cmp::Ordering,
29 : io,
30 : iter::Rev,
31 : ops::{Range, RangeInclusive},
32 : result,
33 : };
34 : use thiserror::Error;
35 : use tracing::error;
36 :
37 : use crate::{
38 : context::{DownloadBehavior, RequestContext},
39 : task_mgr::TaskKind,
40 : tenant::block_io::{BlockReader, BlockWriter},
41 : };
42 :
43 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
44 : pub const VALUE_SZ: usize = 5;
45 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
46 :
47 : pub const PAGE_SZ: usize = 8192;
48 :
49 : #[derive(Clone, Copy, Debug)]
50 : struct Value([u8; VALUE_SZ]);
51 :
52 : impl Value {
53 4704359 : fn from_slice(slice: &[u8]) -> Value {
54 4704359 : let mut b = [0u8; VALUE_SZ];
55 4704359 : b.copy_from_slice(slice);
56 4704359 : Value(b)
57 4704359 : }
58 :
59 7189673 : fn from_u64(x: u64) -> Value {
60 7189673 : assert!(x <= 0x007f_ffff_ffff);
61 7189673 : Value([
62 7189673 : (x >> 32) as u8,
63 7189673 : (x >> 24) as u8,
64 7189673 : (x >> 16) as u8,
65 7189673 : (x >> 8) as u8,
66 7189673 : x as u8,
67 7189673 : ])
68 7189673 : }
69 :
70 12867 : fn from_blknum(x: u32) -> Value {
71 12867 : Value([
72 12867 : 0x80,
73 12867 : (x >> 24) as u8,
74 12867 : (x >> 16) as u8,
75 12867 : (x >> 8) as u8,
76 12867 : x as u8,
77 12867 : ])
78 12867 : }
79 :
80 : #[allow(dead_code)]
81 0 : fn is_offset(self) -> bool {
82 0 : self.0[0] & 0x80 != 0
83 0 : }
84 :
85 4054075 : fn to_u64(self) -> u64 {
86 4054075 : let b = &self.0;
87 4054075 : (b[0] as u64) << 32
88 4054075 : | (b[1] as u64) << 24
89 4054075 : | (b[2] as u64) << 16
90 4054075 : | (b[3] as u64) << 8
91 4054075 : | b[4] as u64
92 4054075 : }
93 :
94 644260 : fn to_blknum(self) -> u32 {
95 644260 : let b = &self.0;
96 644260 : assert!(b[0] == 0x80);
97 644260 : (b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
98 644260 : }
99 : }
100 :
101 0 : #[derive(Error, Debug)]
102 : pub enum DiskBtreeError {
103 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
104 : AppendOverflow(u64),
105 :
106 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
107 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
108 :
109 : #[error("Could not push to new leaf node")]
110 : FailedToPushToNewLeafNode,
111 :
112 : #[error("IoError: {0}")]
113 : Io(#[from] io::Error),
114 : }
115 :
116 : pub type Result<T> = result::Result<T, DiskBtreeError>;
117 :
118 : /// This is the on-disk representation.
119 : struct OnDiskNode<'a, const L: usize> {
120 : // Fixed-width fields
121 : num_children: u16,
122 : level: u8,
123 : prefix_len: u8,
124 : suffix_len: u8,
125 :
126 : // Variable-length fields. These are stored on-disk after the fixed-width
127 : // fields, in this order. In the in-memory representation, these point to
128 : // the right parts in the page buffer.
129 : prefix: &'a [u8],
130 : keys: &'a [u8],
131 : values: &'a [u8],
132 : }
133 :
134 : impl<'a, const L: usize> OnDiskNode<'a, L> {
135 : ///
136 : /// Interpret a PAGE_SZ page as a node.
137 : ///
138 1543145 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
139 1543145 : let mut cursor = std::io::Cursor::new(buf);
140 1543145 : let num_children = cursor.read_u16::<BE>()?;
141 1543145 : let level = cursor.read_u8()?;
142 1543145 : let prefix_len = cursor.read_u8()?;
143 1543145 : let suffix_len = cursor.read_u8()?;
144 :
145 1543145 : let mut off = cursor.position();
146 1543145 : let prefix_off = off as usize;
147 1543145 : off += prefix_len as u64;
148 1543145 :
149 1543145 : let keys_off = off as usize;
150 1543145 : let keys_len = num_children as usize * suffix_len as usize;
151 1543145 : off += keys_len as u64;
152 1543145 :
153 1543145 : let values_off = off as usize;
154 1543145 : let values_len = num_children as usize * VALUE_SZ;
155 1543145 : //off += values_len as u64;
156 1543145 :
157 1543145 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
158 1543145 : let keys = &buf[keys_off..keys_off + keys_len];
159 1543145 : let values = &buf[values_off..values_off + values_len];
160 1543145 :
161 1543145 : Ok(OnDiskNode {
162 1543145 : num_children,
163 1543145 : level,
164 1543145 : prefix_len,
165 1543145 : suffix_len,
166 1543145 : prefix,
167 1543145 : keys,
168 1543145 : values,
169 1543145 : })
170 1543145 : }
171 :
172 : ///
173 : /// Read a value at 'idx'
174 : ///
175 4704359 : fn value(&self, idx: usize) -> Value {
176 4704359 : let value_off = idx * VALUE_SZ;
177 4704359 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
178 4704359 : Value::from_slice(value_slice)
179 4704359 : }
180 :
181 1331823 : fn binary_search(
182 1331823 : &self,
183 1331823 : search_key: &[u8; L],
184 1331823 : keybuf: &mut [u8],
185 1331823 : ) -> result::Result<usize, usize> {
186 1331823 : let mut size = self.num_children as usize;
187 1331823 : let mut low = 0;
188 1331823 : let mut high = size;
189 9939409 : while low < high {
190 8820431 : let mid = low + size / 2;
191 8820431 :
192 8820431 : let key_off = mid * self.suffix_len as usize;
193 8820431 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
194 8820431 : // Does this match?
195 8820431 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
196 8820431 :
197 8820431 : let cmp = keybuf[..].cmp(search_key);
198 8820431 :
199 8820431 : if cmp == Ordering::Less {
200 5591405 : low = mid + 1;
201 5591405 : } else if cmp == Ordering::Greater {
202 3016181 : high = mid;
203 3016181 : } else {
204 212845 : return Ok(mid);
205 : }
206 8607586 : size = high - low;
207 : }
208 1118978 : Err(low)
209 1331823 : }
210 : }
211 :
212 : ///
213 : /// Public reader object, to search the tree.
214 : ///
215 : #[derive(Clone)]
216 : pub struct DiskBtreeReader<R, const L: usize>
217 : where
218 : R: BlockReader,
219 : {
220 : start_blk: u32,
221 : root_blk: u32,
222 : reader: R,
223 : }
224 :
225 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
226 : pub enum VisitDirection {
227 : Forwards,
228 : Backwards,
229 : }
230 :
231 : impl<R, const L: usize> DiskBtreeReader<R, L>
232 : where
233 : R: BlockReader,
234 : {
235 212424 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
236 212424 : DiskBtreeReader {
237 212424 : start_blk,
238 212424 : root_blk,
239 212424 : reader,
240 212424 : }
241 212424 : }
242 :
243 : ///
244 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
245 : ///
246 410230 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
247 410230 : let mut result: Option<u64> = None;
248 410230 : self.visit(
249 410230 : search_key,
250 410230 : VisitDirection::Forwards,
251 410230 : |key, value| {
252 210206 : if key == search_key {
253 208197 : result = Some(value);
254 208197 : }
255 210206 : false
256 410230 : },
257 410230 : ctx,
258 410230 : )
259 406 : .await?;
260 410230 : Ok(result)
261 410230 : }
262 :
263 58 : pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
264 58 : where
265 58 : R: 'a,
266 58 : {
267 58 : DiskBtreeIterator {
268 58 : stream: Box::pin(self.into_stream(start_key, ctx)),
269 58 : }
270 58 : }
271 :
272 : /// Return a stream which yields all key, value pairs from the index
273 : /// starting from the first key greater or equal to `start_key`.
274 : ///
275 : /// Note 1: that this is a copy of [`Self::visit`].
276 : /// TODO: Once the sequential read path is removed this will become
277 : /// the only index traversal method.
278 : ///
279 : /// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
280 : /// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
281 : /// requires the reader to be present when the stream is used, and this creates a lifetime
282 : /// dependency between the reader and the stream. Now if we want to create an iterator that
283 : /// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
284 : /// to use from the image/delta layer APIs.
285 : ///
286 : /// Feel free to add the `&self` variant back if it's necessary.
287 64463 : pub fn into_stream<'a>(
288 64463 : self,
289 64463 : start_key: &'a [u8; L],
290 64463 : ctx: &'a RequestContext,
291 64463 : ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
292 64463 : where
293 64463 : R: 'a,
294 64463 : {
295 : try_stream! {
296 : let mut stack = Vec::new();
297 : stack.push((self.root_blk, None));
298 : let block_cursor = self.reader.block_cursor();
299 : while let Some((node_blknum, opt_iter)) = stack.pop() {
300 : // Locate the node.
301 : let node_buf = block_cursor
302 : .read_blk(self.start_blk + node_blknum, ctx)
303 : .await?;
304 :
305 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
306 : let prefix_len = node.prefix_len as usize;
307 : let suffix_len = node.suffix_len as usize;
308 :
309 : assert!(node.num_children > 0);
310 :
311 : let mut keybuf = Vec::new();
312 : keybuf.extend(node.prefix);
313 : keybuf.resize(prefix_len + suffix_len, 0);
314 :
315 : let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
316 : iter
317 : } else {
318 : // Locate the first match
319 : let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
320 : Ok(idx) => idx,
321 : Err(idx) => {
322 : if node.level == 0 {
323 : // Imagine that the node contains the following keys:
324 : //
325 : // 1
326 : // 3 <-- idx
327 : // 5
328 : //
329 : // If the search key is '2' and there is exact match,
330 : // the binary search would return the index of key
331 : // '3'. That's cool, '3' is the first key to return.
332 : idx
333 : } else {
334 : // This is an internal page, so each key represents a lower
335 : // bound for what's in the child page. If there is no exact
336 : // match, we have to return the *previous* entry.
337 : //
338 : // 1 <-- return this
339 : // 3 <-- idx
340 : // 5
341 : idx.saturating_sub(1)
342 : }
343 : }
344 : };
345 : Either::Left(idx..node.num_children.into())
346 : };
347 :
348 : // idx points to the first match now. Keep going from there
349 : while let Some(idx) = iter.next() {
350 : let key_off = idx * suffix_len;
351 : let suffix = &node.keys[key_off..key_off + suffix_len];
352 : keybuf[prefix_len..].copy_from_slice(suffix);
353 : let value = node.value(idx);
354 : #[allow(clippy::collapsible_if)]
355 : if node.level == 0 {
356 : // leaf
357 : yield (keybuf.clone(), value.to_u64());
358 : } else {
359 : stack.push((node_blknum, Some(iter)));
360 : stack.push((value.to_blknum(), None));
361 : break;
362 : }
363 : }
364 : }
365 : }
366 64463 : }
367 :
368 : ///
369 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
370 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
371 : /// backwards)
372 : ///
373 623108 : pub async fn visit<V>(
374 623108 : &self,
375 623108 : search_key: &[u8; L],
376 623108 : dir: VisitDirection,
377 623108 : mut visitor: V,
378 623108 : ctx: &RequestContext,
379 623108 : ) -> Result<bool>
380 623108 : where
381 623108 : V: FnMut(&[u8], u64) -> bool,
382 623108 : {
383 623108 : let mut stack = Vec::new();
384 623108 : stack.push((self.root_blk, None));
385 623108 : let block_cursor = self.reader.block_cursor();
386 1628391 : while let Some((node_blknum, opt_iter)) = stack.pop() {
387 : // Locate the node.
388 1427841 : let node_buf = block_cursor
389 1427841 : .read_blk(self.start_blk + node_blknum, ctx)
390 23563 : .await?;
391 :
392 1427841 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
393 1427841 : let prefix_len = node.prefix_len as usize;
394 1427841 : let suffix_len = node.suffix_len as usize;
395 1427841 :
396 1427841 : assert!(node.num_children > 0);
397 :
398 1427841 : let mut keybuf = Vec::new();
399 1427841 : keybuf.extend(node.prefix);
400 1427841 : keybuf.resize(prefix_len + suffix_len, 0);
401 :
402 1427841 : let mut iter = if let Some(iter) = opt_iter {
403 203898 : iter
404 1223943 : } else if dir == VisitDirection::Forwards {
405 : // Locate the first match
406 817700 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
407 210407 : Ok(idx) => idx,
408 607293 : Err(idx) => {
409 607293 : if node.level == 0 {
410 : // Imagine that the node contains the following keys:
411 : //
412 : // 1
413 : // 3 <-- idx
414 : // 5
415 : //
416 : // If the search key is '2' and there is exact match,
417 : // the binary search would return the index of key
418 : // '3'. That's cool, '3' is the first key to return.
419 208119 : idx
420 : } else {
421 : // This is an internal page, so each key represents a lower
422 : // bound for what's in the child page. If there is no exact
423 : // match, we have to return the *previous* entry.
424 : //
425 : // 1 <-- return this
426 : // 3 <-- idx
427 : // 5
428 399174 : idx.saturating_sub(1)
429 : }
430 : }
431 : };
432 817700 : Either::Left(idx..node.num_children.into())
433 : } else {
434 406243 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
435 2204 : Ok(idx) => {
436 2204 : // Exact match. That's the first entry to return, and walk
437 2204 : // backwards from there.
438 2204 : idx
439 : }
440 404039 : Err(idx) => {
441 : // No exact match. The binary search returned the index of the
442 : // first key that's > search_key. Back off by one, and walk
443 : // backwards from there.
444 404039 : if let Some(idx) = idx.checked_sub(1) {
445 397982 : idx
446 : } else {
447 6057 : return Ok(false);
448 : }
449 : }
450 : };
451 400186 : Either::Right((0..=idx).rev())
452 : };
453 :
454 : // idx points to the first match now. Keep going from there
455 3565774 : while let Some(idx) = iter.next() {
456 3161326 : let key_off = idx * suffix_len;
457 3161326 : let suffix = &node.keys[key_off..key_off + suffix_len];
458 3161326 : keybuf[prefix_len..].copy_from_slice(suffix);
459 3161326 : let value = node.value(idx);
460 3161326 : #[allow(clippy::collapsible_if)]
461 3161326 : if node.level == 0 {
462 : // leaf
463 2560491 : if !visitor(&keybuf, value.to_u64()) {
464 416501 : return Ok(false);
465 2143990 : }
466 : } else {
467 600835 : stack.push((node_blknum, Some(iter)));
468 600835 : stack.push((value.to_blknum(), None));
469 600835 : break;
470 : }
471 : }
472 : }
473 200550 : Ok(true)
474 623108 : }
475 :
476 : #[allow(dead_code)]
477 10 : pub async fn dump(&self) -> Result<()> {
478 10 : let mut stack = Vec::new();
479 10 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
480 10 :
481 10 : stack.push((self.root_blk, String::new(), 0, 0, 0));
482 10 :
483 10 : let block_cursor = self.reader.block_cursor();
484 :
485 6042 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
486 6032 : let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
487 6032 : let buf: &[u8] = blk.as_ref();
488 6032 : let node = OnDiskNode::<L>::deparse(buf)?;
489 :
490 6032 : if child_idx == 0 {
491 18 : print!("{:indent$}", "", indent = depth * 2);
492 18 : let path_prefix = stack
493 18 : .iter()
494 18 : .map(|(_blknum, path, ..)| path.as_str())
495 18 : .collect::<String>();
496 18 : println!(
497 18 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
498 18 : hex::encode(node.prefix),
499 18 : node.suffix_len
500 18 : );
501 6014 : }
502 :
503 6032 : if child_idx + 1 < node.num_children {
504 6014 : let key_off = key_off + node.suffix_len as usize;
505 6014 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
506 6014 : }
507 6032 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
508 6032 : let val = node.value(child_idx as usize);
509 6032 :
510 6032 : print!("{:indent$}", "", indent = depth * 2 + 2);
511 6032 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
512 6032 :
513 6032 : if node.level > 0 {
514 8 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
515 6024 : }
516 : }
517 10 : Ok(())
518 10 : }
519 : }
520 :
521 : pub struct DiskBtreeIterator<'a> {
522 : #[allow(clippy::type_complexity)]
523 : stream: std::pin::Pin<
524 : Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a>,
525 : >,
526 : }
527 :
528 : impl<'a> DiskBtreeIterator<'a> {
529 223464 : pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
530 223464 : self.stream.next().await
531 223464 : }
532 : }
533 :
534 : ///
535 : /// Public builder object, for creating a new tree.
536 : ///
537 : /// Usage: Create a builder object by calling 'new', load all the data into the
538 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
539 : ///
540 : /// 'L' is the key length in bytes
541 : pub struct DiskBtreeBuilder<W, const L: usize>
542 : where
543 : W: BlockWriter,
544 : {
545 : writer: W,
546 :
547 : ///
548 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
549 : ///
550 : /// We maintain the length of the stack to be always greater than zero.
551 : /// Two exceptions are:
552 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
553 : /// So because other methods cannot see the intermediate state invariant still holds.
554 : /// 2. `Self::finish`. It consumes self and does not return it back,
555 : /// which means that this is where the structure is destroyed.
556 : /// Thus stack of zero length cannot be observed by other methods.
557 : stack: Vec<BuildNode<L>>,
558 :
559 : /// Last key that was appended to the tree. Used to sanity check that append
560 : /// is called in increasing key order.
561 : last_key: Option<[u8; L]>,
562 : }
563 :
564 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
565 : where
566 : W: BlockWriter,
567 : {
568 1582 : pub fn new(writer: W) -> Self {
569 1582 : DiskBtreeBuilder {
570 1582 : writer,
571 1582 : last_key: None,
572 1582 : stack: vec![BuildNode::new(0)],
573 1582 : }
574 1582 : }
575 :
576 7189675 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
577 7189675 : if value > MAX_VALUE {
578 0 : return Err(DiskBtreeError::AppendOverflow(value));
579 7189675 : }
580 7189675 : if let Some(last_key) = &self.last_key {
581 7188099 : if key <= last_key {
582 2 : return Err(DiskBtreeError::UnsortedInput {
583 2 : key: key.as_slice().into(),
584 2 : last_key: last_key.as_slice().into(),
585 2 : });
586 7188097 : }
587 1576 : }
588 7189673 : self.last_key = Some(*key);
589 7189673 :
590 7189673 : self.append_internal(key, Value::from_u64(value))
591 7189675 : }
592 :
593 7202540 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
594 7202540 : // Try to append to the current leaf buffer
595 7202540 : let last = self
596 7202540 : .stack
597 7202540 : .last_mut()
598 7202540 : .expect("should always have at least one item");
599 7202540 : let level = last.level;
600 7202540 : if last.push(key, value) {
601 7178085 : return Ok(());
602 24455 : }
603 24455 :
604 24455 : // It did not fit. Try to compress, and if it succeeds to make
605 24455 : // some room on the node, try appending to it again.
606 24455 : #[allow(clippy::collapsible_if)]
607 24455 : if last.compress() {
608 12383 : if last.push(key, value) {
609 12376 : return Ok(());
610 7 : }
611 12072 : }
612 :
613 : // Could not append to the current leaf. Flush it and create a new one.
614 12079 : self.flush_node()?;
615 :
616 : // Replace the node we flushed with an empty one and append the new
617 : // key to it.
618 12079 : let mut last = BuildNode::new(level);
619 12079 : if !last.push(key, value) {
620 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
621 12079 : }
622 12079 :
623 12079 : self.stack.push(last);
624 12079 :
625 12079 : Ok(())
626 7202540 : }
627 :
628 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
629 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
630 : /// creates a new root page, increasing the height of the tree.
631 12867 : fn flush_node(&mut self) -> Result<()> {
632 12867 : // Get the current bottommost node in the stack and flush it to disk.
633 12867 : let last = self
634 12867 : .stack
635 12867 : .pop()
636 12867 : .expect("should always have at least one item");
637 12867 : let buf = last.pack();
638 12867 : let downlink_key = last.first_key();
639 12867 : let downlink_ptr = self.writer.write_blk(buf)?;
640 :
641 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
642 : // create a new root page, increasing the height of the tree.
643 12867 : if self.stack.is_empty() {
644 788 : self.stack.push(BuildNode::new(last.level + 1));
645 12079 : }
646 12867 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
647 12867 : }
648 :
649 : ///
650 : /// Flushes everything to disk, and returns the block number of the root page.
651 : /// The caller must store the root block number "out-of-band", and pass it
652 : /// to the DiskBtreeReader::new() when you want to read the tree again.
653 : /// (In the image and delta layers, it is stored in the beginning of the file,
654 : /// in the summary header)
655 : ///
656 1574 : pub fn finish(mut self) -> Result<(u32, W)> {
657 : // flush all levels, except the root.
658 2362 : while self.stack.len() > 1 {
659 788 : self.flush_node()?;
660 : }
661 :
662 1574 : let root = self
663 1574 : .stack
664 1574 : .first()
665 1574 : .expect("by the check above we left one item there");
666 1574 : let buf = root.pack();
667 1574 : let root_blknum = self.writer.write_blk(buf)?;
668 :
669 1574 : Ok((root_blknum, self.writer))
670 1574 : }
671 :
672 2023972 : pub fn borrow_writer(&self) -> &W {
673 2023972 : &self.writer
674 2023972 : }
675 : }
676 :
677 : ///
678 : /// BuildNode represesnts an incomplete page that we are appending to.
679 : ///
680 : #[derive(Clone, Debug)]
681 : struct BuildNode<const L: usize> {
682 : num_children: u16,
683 : level: u8,
684 : prefix: Vec<u8>,
685 : suffix_len: usize,
686 :
687 : keys: Vec<u8>,
688 : values: Vec<u8>,
689 :
690 : size: usize, // physical size of this node, if it was written to disk like this
691 : }
692 :
693 : const NODE_SIZE: usize = PAGE_SZ;
694 :
695 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
696 :
697 : impl<const L: usize> BuildNode<L> {
698 14449 : fn new(level: u8) -> Self {
699 14449 : BuildNode {
700 14449 : num_children: 0,
701 14449 : level,
702 14449 : prefix: Vec::new(),
703 14449 : suffix_len: 0,
704 14449 : keys: Vec::new(),
705 14449 : values: Vec::new(),
706 14449 : size: NODE_HDR_SIZE,
707 14449 : }
708 14449 : }
709 :
710 : /// Try to append a key-value pair to this node. Returns 'true' on
711 : /// success, 'false' if the page was full or the key was
712 : /// incompatible with the prefix of the existing keys.
713 7227002 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
714 7227002 : // If we have already performed prefix-compression on the page,
715 7227002 : // check that the incoming key has the same prefix.
716 7227002 : if self.num_children > 0 {
717 : // does the prefix allow it?
718 7212559 : if !key.starts_with(&self.prefix) {
719 221 : return false;
720 7212338 : }
721 14443 : } else {
722 14443 : self.suffix_len = key.len();
723 14443 : }
724 :
725 : // Is the node too full?
726 7226781 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
727 24241 : return false;
728 7202540 : }
729 7202540 :
730 7202540 : // All clear
731 7202540 : self.num_children += 1;
732 7202540 : self.keys.extend(&key[self.prefix.len()..]);
733 7202540 : self.values.extend(value.0);
734 7202540 :
735 7202540 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
736 7202540 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
737 :
738 7202540 : self.size += self.suffix_len + VALUE_SZ;
739 7202540 :
740 7202540 : true
741 7227002 : }
742 :
743 : ///
744 : /// Perform prefix-compression.
745 : ///
746 : /// Returns 'true' on success, 'false' if no compression was possible.
747 : ///
748 24455 : fn compress(&mut self) -> bool {
749 24455 : let first_suffix = self.first_suffix();
750 24455 : let last_suffix = self.last_suffix();
751 24455 :
752 24455 : // Find the common prefix among all keys
753 24455 : let mut prefix_len = 0;
754 221911 : while prefix_len < self.suffix_len {
755 221911 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
756 24455 : break;
757 197456 : }
758 197456 : prefix_len += 1;
759 : }
760 24455 : if prefix_len == 0 {
761 12072 : return false;
762 12383 : }
763 12383 :
764 12383 : // Can compress. Rewrite the keys without the common prefix.
765 12383 : self.prefix.extend(&self.keys[..prefix_len]);
766 12383 :
767 12383 : let mut new_keys = Vec::new();
768 12383 : let mut key_off = 0;
769 3348892 : while key_off < self.keys.len() {
770 3336509 : let next_key_off = key_off + self.suffix_len;
771 3336509 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
772 3336509 : key_off = next_key_off;
773 3336509 : }
774 12383 : self.keys = new_keys;
775 12383 : self.suffix_len -= prefix_len;
776 12383 :
777 12383 : self.size -= prefix_len * self.num_children as usize;
778 12383 : self.size += prefix_len;
779 12383 :
780 12383 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
781 12383 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
782 :
783 12383 : true
784 24455 : }
785 :
786 : ///
787 : /// Serialize the node to on-disk format.
788 : ///
789 14441 : fn pack(&self) -> Bytes {
790 14441 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
791 14441 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
792 14441 : assert!(self.num_children > 0);
793 :
794 14441 : let mut buf = BytesMut::new();
795 14441 :
796 14441 : buf.put_u16(self.num_children);
797 14441 : buf.put_u8(self.level);
798 14441 : buf.put_u8(self.prefix.len() as u8);
799 14441 : buf.put_u8(self.suffix_len as u8);
800 14441 : buf.put(&self.prefix[..]);
801 14441 : buf.put(&self.keys[..]);
802 14441 : buf.put(&self.values[..]);
803 14441 :
804 14441 : assert!(buf.len() == self.size);
805 :
806 14441 : assert!(buf.len() <= PAGE_SZ);
807 14441 : buf.resize(PAGE_SZ, 0);
808 14441 : buf.freeze()
809 14441 : }
810 :
811 37322 : fn first_suffix(&self) -> &[u8] {
812 37322 : &self.keys[..self.suffix_len]
813 37322 : }
814 24455 : fn last_suffix(&self) -> &[u8] {
815 24455 : &self.keys[self.keys.len() - self.suffix_len..]
816 24455 : }
817 :
818 : /// Return the full first key of the page, including the prefix
819 12867 : fn first_key(&self) -> [u8; L] {
820 12867 : let mut key = [0u8; L];
821 12867 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
822 12867 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
823 12867 : key
824 12867 : }
825 : }
826 :
827 : #[cfg(test)]
828 : pub(crate) mod tests {
829 : use super::*;
830 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
831 : use rand::Rng;
832 : use std::collections::BTreeMap;
833 : use std::sync::atomic::{AtomicUsize, Ordering};
834 :
835 : #[derive(Clone, Default)]
836 : pub(crate) struct TestDisk {
837 : blocks: Vec<Bytes>,
838 : }
839 : impl TestDisk {
840 10 : fn new() -> Self {
841 10 : Self::default()
842 10 : }
843 1016822 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
844 1016822 : let mut buf = [0u8; PAGE_SZ];
845 1016822 : buf.copy_from_slice(&self.blocks[blknum as usize]);
846 1016822 : Ok(std::sync::Arc::new(buf).into())
847 1016822 : }
848 : }
849 : impl BlockReader for TestDisk {
850 411248 : fn block_cursor(&self) -> BlockCursor<'_> {
851 411248 : BlockCursor::new(BlockReaderRef::TestDisk(self))
852 411248 : }
853 : }
854 : impl BlockWriter for &mut TestDisk {
855 217 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
856 217 : let blknum = self.blocks.len();
857 217 : self.blocks.push(buf);
858 217 : Ok(blknum as u32)
859 217 : }
860 : }
861 :
862 : #[tokio::test]
863 2 : async fn basic() -> Result<()> {
864 2 : let mut disk = TestDisk::new();
865 2 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
866 2 :
867 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
868 2 :
869 2 : let all_keys: Vec<&[u8; 6]> = vec![
870 2 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
871 2 : ];
872 2 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
873 2 : .iter()
874 2 : .enumerate()
875 16 : .map(|(idx, key)| (*key, idx as u64))
876 2 : .collect();
877 16 : for (key, val) in all_data.iter() {
878 16 : writer.append(key, *val)?;
879 2 : }
880 2 :
881 2 : let (root_offset, _writer) = writer.finish()?;
882 2 :
883 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
884 2 :
885 2 : reader.dump().await?;
886 2 :
887 2 : // Test the `get` function on all the keys.
888 16 : for (key, val) in all_data.iter() {
889 16 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
890 2 : }
891 2 : // And on some keys that don't exist
892 2 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
893 2 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
894 2 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
895 2 :
896 2 : // Test search with `visit` function
897 2 : let search_key = b"xabaaa";
898 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
899 2 : .iter()
900 16 : .filter(|(key, _value)| key[..] >= search_key[..])
901 10 : .map(|(key, value)| (key.to_vec(), *value))
902 2 : .collect();
903 2 :
904 2 : let mut data = Vec::new();
905 2 : reader
906 2 : .visit(
907 2 : search_key,
908 2 : VisitDirection::Forwards,
909 10 : |key, value| {
910 10 : data.push((key.to_vec(), value));
911 10 : true
912 10 : },
913 2 : &ctx,
914 2 : )
915 2 : .await?;
916 2 : assert_eq!(data, expected);
917 2 :
918 2 : // Test a backwards scan
919 2 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
920 2 : .iter()
921 16 : .filter(|(key, _value)| key[..] <= search_key[..])
922 8 : .map(|(key, value)| (key.to_vec(), *value))
923 2 : .collect();
924 2 : expected.reverse();
925 2 : let mut data = Vec::new();
926 2 : reader
927 2 : .visit(
928 2 : search_key,
929 2 : VisitDirection::Backwards,
930 8 : |key, value| {
931 8 : data.push((key.to_vec(), value));
932 8 : true
933 8 : },
934 2 : &ctx,
935 2 : )
936 2 : .await?;
937 2 : assert_eq!(data, expected);
938 2 :
939 2 : // Backward scan where nothing matches
940 2 : reader
941 2 : .visit(
942 2 : b"aaaaaa",
943 2 : VisitDirection::Backwards,
944 2 : |key, value| {
945 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
946 2 : },
947 2 : &ctx,
948 2 : )
949 2 : .await?;
950 2 :
951 2 : // Full scan
952 2 : let expected: Vec<(Vec<u8>, u64)> = all_data
953 2 : .iter()
954 16 : .map(|(key, value)| (key.to_vec(), *value))
955 2 : .collect();
956 2 : let mut data = Vec::new();
957 2 : reader
958 2 : .visit(
959 2 : &[0u8; 6],
960 2 : VisitDirection::Forwards,
961 16 : |key, value| {
962 16 : data.push((key.to_vec(), value));
963 16 : true
964 16 : },
965 2 : &ctx,
966 2 : )
967 2 : .await?;
968 2 : assert_eq!(data, expected);
969 2 :
970 2 : Ok(())
971 2 : }
972 :
973 : #[tokio::test]
974 2 : async fn lots_of_keys() -> Result<()> {
975 2 : let mut disk = TestDisk::new();
976 2 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
977 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
978 2 :
979 2 : const NUM_KEYS: u64 = 1000;
980 2 :
981 2 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
982 2 :
983 2002 : for idx in 0..NUM_KEYS {
984 2000 : let key_int: u64 = 1 + idx * 2;
985 2000 : let key = u64::to_be_bytes(key_int);
986 2000 : writer.append(&key, idx)?;
987 2 :
988 2000 : all_data.insert(key_int, idx);
989 2 : }
990 2 :
991 2 : let (root_offset, _writer) = writer.finish()?;
992 2 :
993 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
994 2 :
995 2 : reader.dump().await?;
996 2 :
997 2 : use std::sync::Mutex;
998 2 :
999 2 : let result = Mutex::new(Vec::new());
1000 2 : let limit: AtomicUsize = AtomicUsize::new(10);
1001 83820 : let take_ten = |key: &[u8], value: u64| {
1002 83820 : let mut keybuf = [0u8; 8];
1003 83820 : keybuf.copy_from_slice(key);
1004 83820 : let key_int = u64::from_be_bytes(keybuf);
1005 83820 :
1006 83820 : let mut result = result.lock().unwrap();
1007 83820 : result.push((key_int, value));
1008 83820 :
1009 83820 : // keep going until we have 10 matches
1010 83820 : result.len() < limit.load(Ordering::Relaxed)
1011 83820 : };
1012 2 :
1013 4020 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
1014 4020 : let search_key = u64::to_be_bytes(search_key_int);
1015 4020 : assert_eq!(
1016 4020 : reader.get(&search_key, &ctx).await?,
1017 4020 : all_data.get(&search_key_int).cloned()
1018 2 : );
1019 2 :
1020 2 : // Test a forward scan starting with this key
1021 4020 : result.lock().unwrap().clear();
1022 4020 : reader
1023 4020 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1024 2 : .await?;
1025 4020 : let expected = all_data
1026 4020 : .range(search_key_int..)
1027 4020 : .take(10)
1028 39820 : .map(|(&key, &val)| (key, val))
1029 4020 : .collect::<Vec<(u64, u64)>>();
1030 4020 : assert_eq!(*result.lock().unwrap(), expected);
1031 2 :
1032 2 : // And a backwards scan
1033 4020 : result.lock().unwrap().clear();
1034 4020 : reader
1035 4020 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1036 2 : .await?;
1037 4020 : let expected = all_data
1038 4020 : .range(..=search_key_int)
1039 4020 : .rev()
1040 4020 : .take(10)
1041 40000 : .map(|(&key, &val)| (key, val))
1042 4020 : .collect::<Vec<(u64, u64)>>();
1043 4020 : assert_eq!(*result.lock().unwrap(), expected);
1044 2 : }
1045 2 :
1046 2 : // full scan
1047 2 : let search_key = u64::to_be_bytes(0);
1048 2 : limit.store(usize::MAX, Ordering::Relaxed);
1049 2 : result.lock().unwrap().clear();
1050 2 : reader
1051 2 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1052 2 : .await?;
1053 2 : let expected = all_data
1054 2 : .iter()
1055 2000 : .map(|(&key, &val)| (key, val))
1056 2 : .collect::<Vec<(u64, u64)>>();
1057 2 : assert_eq!(*result.lock().unwrap(), expected);
1058 2 :
1059 2 : // full scan
1060 2 : let search_key = u64::to_be_bytes(u64::MAX);
1061 2 : limit.store(usize::MAX, Ordering::Relaxed);
1062 2 : result.lock().unwrap().clear();
1063 2 : reader
1064 2 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1065 2 : .await?;
1066 2 : let expected = all_data
1067 2 : .iter()
1068 2 : .rev()
1069 2000 : .map(|(&key, &val)| (key, val))
1070 2 : .collect::<Vec<(u64, u64)>>();
1071 2 : assert_eq!(*result.lock().unwrap(), expected);
1072 2 :
1073 2 : Ok(())
1074 2 : }
1075 :
1076 : #[tokio::test]
1077 2 : async fn random_data() -> Result<()> {
1078 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1079 2 :
1080 2 : // Generate random keys with exponential distribution, to
1081 2 : // exercise the prefix compression
1082 2 : const NUM_KEYS: usize = 100000;
1083 2 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
1084 200002 : for idx in 0..NUM_KEYS {
1085 200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
1086 200000 : let t = -(f64::ln(u));
1087 200000 : let key_int = (t * 1000000.0) as u128;
1088 200000 :
1089 200000 : all_data.insert(key_int, idx as u64);
1090 200000 : }
1091 2 :
1092 2 : // Build a tree from it
1093 2 : let mut disk = TestDisk::new();
1094 2 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
1095 2 :
1096 195136 : for (&key, &val) in all_data.iter() {
1097 195136 : writer.append(&u128::to_be_bytes(key), val)?;
1098 2 : }
1099 2 : let (root_offset, _writer) = writer.finish()?;
1100 2 :
1101 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1102 2 :
1103 2 : // Test get() operation on all the keys
1104 195136 : for (&key, &val) in all_data.iter() {
1105 195136 : let search_key = u128::to_be_bytes(key);
1106 195136 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
1107 2 : }
1108 2 :
1109 2 : // Test get() operations on random keys, most of which will not exist
1110 200002 : for _ in 0..100000 {
1111 200000 : let key_int = rand::thread_rng().gen::<u128>();
1112 200000 : let search_key = u128::to_be_bytes(key_int);
1113 200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
1114 2 : }
1115 2 :
1116 2 : // Test boundary cases
1117 2 : assert!(
1118 2 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
1119 2 : == all_data.get(&u128::MIN).cloned()
1120 2 : );
1121 2 : assert!(
1122 2 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
1123 2 : == all_data.get(&u128::MAX).cloned()
1124 2 : );
1125 2 :
1126 2 : // Test iterator and get_stream API
1127 2 : let mut iter = reader.iter(&[0; 16], &ctx);
1128 2 : let mut cnt = 0;
1129 195138 : while let Some(res) = iter.next().await {
1130 195136 : let (key, val) = res?;
1131 195136 : let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
1132 195136 : assert_eq!(val, *all_data.get(&key).unwrap());
1133 195136 : cnt += 1;
1134 2 : }
1135 2 : assert_eq!(cnt, all_data.len());
1136 2 :
1137 2 : Ok(())
1138 2 : }
1139 :
1140 : #[test]
1141 2 : fn unsorted_input() {
1142 2 : let mut disk = TestDisk::new();
1143 2 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1144 2 :
1145 2 : let _ = writer.append(b"ba", 1);
1146 2 : let _ = writer.append(b"bb", 2);
1147 2 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1148 2 : match err {
1149 2 : DiskBtreeError::UnsortedInput { key, last_key } => {
1150 2 : assert_eq!(key.as_ref(), b"aa".as_slice());
1151 2 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1152 : }
1153 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1154 : }
1155 2 : }
1156 :
1157 : ///
1158 : /// This test contains a particular data set, see disk_btree_test_data.rs
1159 : ///
1160 : #[tokio::test]
1161 2 : async fn particular_data() -> Result<()> {
1162 2 : // Build a tree from it
1163 2 : let mut disk = TestDisk::new();
1164 2 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1165 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1166 2 :
1167 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1168 4000 : writer.append(&key, val)?;
1169 2 : }
1170 2 : let (root_offset, writer) = writer.finish()?;
1171 2 :
1172 2 : println!("SIZE: {} blocks", writer.blocks.len());
1173 2 :
1174 2 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1175 2 :
1176 2 : // Test get() operation on all the keys
1177 4002 : for (key, val) in disk_btree_test_data::TEST_DATA {
1178 4000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1179 2 : }
1180 2 :
1181 2 : // Test full scan
1182 2 : let mut count = 0;
1183 2 : reader
1184 2 : .visit(
1185 2 : &[0u8; 26],
1186 2 : VisitDirection::Forwards,
1187 4000 : |_key, _value| {
1188 4000 : count += 1;
1189 4000 : true
1190 4000 : },
1191 2 : &ctx,
1192 2 : )
1193 2 : .await?;
1194 2 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1195 2 :
1196 2 : reader.dump().await?;
1197 2 :
1198 2 : Ok(())
1199 2 : }
1200 : }
1201 :
1202 : #[cfg(test)]
1203 : #[path = "disk_btree_test_data.rs"]
1204 : mod disk_btree_test_data;
|