Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use std::cmp::Ordering;
22 : use std::iter::Rev;
23 : use std::ops::{Range, RangeInclusive};
24 : use std::{io, result};
25 :
26 : use async_stream::try_stream;
27 : use byteorder::{BE, ReadBytesExt};
28 : use bytes::BufMut;
29 : use either::Either;
30 : use futures::{Stream, StreamExt};
31 : use hex;
32 : use thiserror::Error;
33 : use tracing::error;
34 :
35 : use crate::context::RequestContext;
36 : use crate::tenant::block_io::{BlockReader, BlockWriter};
37 : use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer};
38 :
39 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
40 : pub const VALUE_SZ: usize = 5;
41 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
42 :
43 : pub const PAGE_SZ: usize = 8192;
44 :
45 : #[derive(Clone, Copy, Debug)]
46 : struct Value([u8; VALUE_SZ]);
47 :
48 : impl Value {
49 54684992 : fn from_slice(slice: &[u8]) -> Value {
50 54684992 : let mut b = [0u8; VALUE_SZ];
51 54684992 : b.copy_from_slice(slice);
52 54684992 : Value(b)
53 54684992 : }
54 :
55 40491813 : fn from_u64(x: u64) -> Value {
56 40491813 : assert!(x <= 0x007f_ffff_ffff);
57 40491813 : Value([
58 40491813 : (x >> 32) as u8,
59 40491813 : (x >> 24) as u8,
60 40491813 : (x >> 16) as u8,
61 40491813 : (x >> 8) as u8,
62 40491813 : x as u8,
63 40491813 : ])
64 40491813 : }
65 :
66 74936 : fn from_blknum(x: u32) -> Value {
67 74936 : Value([
68 74936 : 0x80,
69 74936 : (x >> 24) as u8,
70 74936 : (x >> 16) as u8,
71 74936 : (x >> 8) as u8,
72 74936 : x as u8,
73 74936 : ])
74 74936 : }
75 :
76 : #[allow(dead_code)]
77 0 : fn is_offset(self) -> bool {
78 0 : self.0[0] & 0x80 != 0
79 0 : }
80 :
81 50731893 : fn to_u64(self) -> u64 {
82 50731893 : let b = &self.0;
83 50731893 : ((b[0] as u64) << 32)
84 50731893 : | ((b[1] as u64) << 24)
85 50731893 : | ((b[2] as u64) << 16)
86 50731893 : | ((b[3] as u64) << 8)
87 50731893 : | b[4] as u64
88 50731893 : }
89 :
90 3916955 : fn to_blknum(self) -> u32 {
91 3916955 : let b = &self.0;
92 3916955 : assert!(b[0] == 0x80);
93 3916955 : ((b[1] as u32) << 24) | ((b[2] as u32) << 16) | ((b[3] as u32) << 8) | b[4] as u32
94 3916955 : }
95 : }
96 :
97 : #[derive(Error, Debug)]
98 : pub enum DiskBtreeError {
99 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
100 : AppendOverflow(u64),
101 :
102 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
103 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
104 :
105 : #[error("Could not push to new leaf node")]
106 : FailedToPushToNewLeafNode,
107 :
108 : #[error("IoError: {0}")]
109 : Io(#[from] io::Error),
110 : }
111 :
112 : pub type Result<T> = result::Result<T, DiskBtreeError>;
113 :
114 : /// This is the on-disk representation.
115 : struct OnDiskNode<'a, const L: usize> {
116 : // Fixed-width fields
117 : num_children: u16,
118 : level: u8,
119 : prefix_len: u8,
120 : suffix_len: u8,
121 :
122 : // Variable-length fields. These are stored on-disk after the fixed-width
123 : // fields, in this order. In the in-memory representation, these point to
124 : // the right parts in the page buffer.
125 : prefix: &'a [u8],
126 : keys: &'a [u8],
127 : values: &'a [u8],
128 : }
129 :
130 : impl<const L: usize> OnDiskNode<'_, L> {
131 : ///
132 : /// Interpret a PAGE_SZ page as a node.
133 : ///
134 9476504 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
135 9476504 : let mut cursor = std::io::Cursor::new(buf);
136 9476504 : let num_children = cursor.read_u16::<BE>()?;
137 9476504 : let level = cursor.read_u8()?;
138 9476504 : let prefix_len = cursor.read_u8()?;
139 9476504 : let suffix_len = cursor.read_u8()?;
140 :
141 9476504 : let mut off = cursor.position();
142 9476504 : let prefix_off = off as usize;
143 9476504 : off += prefix_len as u64;
144 9476504 :
145 9476504 : let keys_off = off as usize;
146 9476504 : let keys_len = num_children as usize * suffix_len as usize;
147 9476504 : off += keys_len as u64;
148 9476504 :
149 9476504 : let values_off = off as usize;
150 9476504 : let values_len = num_children as usize * VALUE_SZ;
151 9476504 : //off += values_len as u64;
152 9476504 :
153 9476504 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
154 9476504 : let keys = &buf[keys_off..keys_off + keys_len];
155 9476504 : let values = &buf[values_off..values_off + values_len];
156 9476504 :
157 9476504 : Ok(OnDiskNode {
158 9476504 : num_children,
159 9476504 : level,
160 9476504 : prefix_len,
161 9476504 : suffix_len,
162 9476504 : prefix,
163 9476504 : keys,
164 9476504 : values,
165 9476504 : })
166 9476504 : }
167 :
168 : ///
169 : /// Read a value at 'idx'
170 : ///
171 54684992 : fn value(&self, idx: usize) -> Value {
172 54684992 : let value_off = idx * VALUE_SZ;
173 54684992 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
174 54684992 : Value::from_slice(value_slice)
175 54684992 : }
176 :
177 8109891 : fn binary_search(
178 8109891 : &self,
179 8109891 : search_key: &[u8; L],
180 8109891 : keybuf: &mut [u8],
181 8109891 : ) -> result::Result<usize, usize> {
182 8109891 : let mut size = self.num_children as usize;
183 8109891 : let mut low = 0;
184 8109891 : let mut high = size;
185 61207286 : while low < high {
186 54747149 : let mid = low + size / 2;
187 54747149 :
188 54747149 : let key_off = mid * self.suffix_len as usize;
189 54747149 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
190 54747149 : // Does this match?
191 54747149 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
192 54747149 :
193 54747149 : let cmp = keybuf[..].cmp(search_key);
194 54747149 :
195 54747149 : if cmp == Ordering::Less {
196 34336288 : low = mid + 1;
197 34336288 : } else if cmp == Ordering::Greater {
198 18761107 : high = mid;
199 18761107 : } else {
200 1649754 : return Ok(mid);
201 : }
202 53097395 : size = high - low;
203 : }
204 6460137 : Err(low)
205 8109891 : }
206 : }
207 :
208 : ///
209 : /// Public reader object, to search the tree.
210 : ///
211 : #[derive(Clone)]
212 : pub struct DiskBtreeReader<R, const L: usize>
213 : where
214 : R: BlockReader,
215 : {
216 : start_blk: u32,
217 : root_blk: u32,
218 : reader: R,
219 : }
220 :
221 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
222 : pub enum VisitDirection {
223 : Forwards,
224 : Backwards,
225 : }
226 :
227 : impl<R, const L: usize> DiskBtreeReader<R, L>
228 : where
229 : R: BlockReader,
230 : {
231 1607647 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
232 1607647 : DiskBtreeReader {
233 1607647 : start_blk,
234 1607647 : root_blk,
235 1607647 : reader,
236 1607647 : }
237 1607647 : }
238 :
239 : ///
240 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
241 : ///
242 2418765 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
243 2418765 : let mut result: Option<u64> = None;
244 2418765 : self.visit(
245 2418765 : search_key,
246 2418765 : VisitDirection::Forwards,
247 2418765 : |key, value| {
248 1218621 : if key == search_key {
249 1206588 : result = Some(value);
250 1206588 : }
251 1218621 : false
252 2418765 : },
253 2418765 : ctx,
254 2418765 : )
255 2418765 : .await?;
256 2418765 : Ok(result)
257 2418765 : }
258 :
259 4224 : pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
260 4224 : where
261 4224 : R: 'a + Send,
262 4224 : {
263 4224 : DiskBtreeIterator {
264 4224 : stream: Box::pin(self.into_stream(start_key, ctx)),
265 4224 : }
266 4224 : }
267 :
268 : /// Return a stream which yields all key, value pairs from the index
269 : /// starting from the first key greater or equal to `start_key`.
270 : ///
271 : /// Note 1: that this is a copy of [`Self::visit`].
272 : /// TODO: Once the sequential read path is removed this will become
273 : /// the only index traversal method.
274 : ///
275 : /// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
276 : /// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
277 : /// requires the reader to be present when the stream is used, and this creates a lifetime
278 : /// dependency between the reader and the stream. Now if we want to create an iterator that
279 : /// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
280 : /// to use from the image/delta layer APIs.
281 : ///
282 : /// Feel free to add the `&self` variant back if it's necessary.
283 1723459 : pub fn into_stream<'a>(
284 1723459 : self,
285 1723459 : start_key: &'a [u8; L],
286 1723459 : ctx: &'a RequestContext,
287 1723459 : ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
288 1723459 : where
289 1723459 : R: 'a,
290 1723459 : {
291 1723459 : try_stream! {
292 1723459 : let mut stack = Vec::new();
293 1723459 : stack.push((self.root_blk, None));
294 1723459 : let block_cursor = self.reader.block_cursor();
295 1723459 : let mut node_buf = [0_u8; PAGE_SZ];
296 1723459 : while let Some((node_blknum, opt_iter)) = stack.pop() {
297 1723459 : // Read the node, through the PS PageCache, into local variable `node_buf`.
298 1723459 : // We could keep the page cache read guard alive, but, at the time of writing,
299 1723459 : // we run quite small PS PageCache s => can't risk running out of
300 1723459 : // PageCache space because this stream isn't consumed fast enough.
301 1723459 : let page_read_guard = block_cursor
302 1723459 : .read_blk(self.start_blk + node_blknum, ctx)
303 1723459 : .await?;
304 1723459 : node_buf.copy_from_slice(page_read_guard.as_ref());
305 1723459 : drop(page_read_guard); // drop page cache read guard early
306 1723459 :
307 1723459 : let node = OnDiskNode::deparse(&node_buf)?;
308 1723459 : let prefix_len = node.prefix_len as usize;
309 1723459 : let suffix_len = node.suffix_len as usize;
310 1723459 :
311 1723459 : assert!(node.num_children > 0);
312 1723459 :
313 1723459 : let mut keybuf = Vec::new();
314 1723459 : keybuf.extend(node.prefix);
315 1723459 : keybuf.resize(prefix_len + suffix_len, 0);
316 1723459 :
317 1723459 : let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
318 1723459 : iter
319 1723459 : } else {
320 1723459 : // Locate the first match
321 1723459 : let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
322 1723459 : Ok(idx) => idx,
323 1723459 : Err(idx) => {
324 1723459 : if node.level == 0 {
325 1723459 : // Imagine that the node contains the following keys:
326 1723459 : //
327 1723459 : // 1
328 1723459 : // 3 <-- idx
329 1723459 : // 5
330 1723459 : //
331 1723459 : // If the search key is '2' and there is exact match,
332 1723459 : // the binary search would return the index of key
333 1723459 : // '3'. That's cool, '3' is the first key to return.
334 1723459 : idx
335 1723459 : } else {
336 1723459 : // This is an internal page, so each key represents a lower
337 1723459 : // bound for what's in the child page. If there is no exact
338 1723459 : // match, we have to return the *previous* entry.
339 1723459 : //
340 1723459 : // 1 <-- return this
341 1723459 : // 3 <-- idx
342 1723459 : // 5
343 1723459 : idx.saturating_sub(1)
344 1723459 : }
345 1723459 : }
346 1723459 : };
347 1723459 : Either::Left(idx..node.num_children.into())
348 1723459 : };
349 1723459 :
350 1723459 :
351 1723459 : // idx points to the first match now. Keep going from there
352 1723459 : while let Some(idx) = iter.next() {
353 1723459 : let key_off = idx * suffix_len;
354 1723459 : let suffix = &node.keys[key_off..key_off + suffix_len];
355 1723459 : keybuf[prefix_len..].copy_from_slice(suffix);
356 1723459 : let value = node.value(idx);
357 1723459 : #[allow(clippy::collapsible_if)]
358 1723459 : if node.level == 0 {
359 1723459 : // leaf
360 1723459 : yield (keybuf.clone(), value.to_u64());
361 1723459 : } else {
362 1723459 : stack.push((node_blknum, Some(iter)));
363 1723459 : stack.push((value.to_blknum(), None));
364 1723459 : break;
365 1723459 : }
366 1723459 : }
367 1723459 : }
368 1723459 : }
369 1723459 : }
370 :
371 : ///
372 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
373 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
374 : /// backwards)
375 : ///
376 2469525 : pub async fn visit<V>(
377 2469525 : &self,
378 2469525 : search_key: &[u8; L],
379 2469525 : dir: VisitDirection,
380 2469525 : mut visitor: V,
381 2469525 : ctx: &RequestContext,
382 2469525 : ) -> Result<bool>
383 2469525 : where
384 2469525 : V: FnMut(&[u8], u64) -> bool,
385 2469525 : {
386 2469525 : let mut stack = Vec::new();
387 2469525 : stack.push((self.root_blk, None));
388 2469525 : let block_cursor = self.reader.block_cursor();
389 7314006 : while let Some((node_blknum, opt_iter)) = stack.pop() {
390 : // Locate the node.
391 6110802 : let node_buf = block_cursor
392 6110802 : .read_blk(self.start_blk + node_blknum, ctx)
393 6110802 : .await?;
394 :
395 6110802 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
396 6110802 : let prefix_len = node.prefix_len as usize;
397 6110802 : let suffix_len = node.suffix_len as usize;
398 6110802 :
399 6110802 : assert!(node.num_children > 0);
400 :
401 6110802 : let mut keybuf = Vec::new();
402 6110802 : keybuf.extend(node.prefix);
403 6110802 : keybuf.resize(prefix_len + suffix_len, 0);
404 :
405 6110802 : let mut iter = if let Some(iter) = opt_iter {
406 1223388 : iter
407 4887414 : } else if dir == VisitDirection::Forwards {
408 : // Locate the first match
409 4863258 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
410 1219836 : Ok(idx) => idx,
411 3643422 : Err(idx) => {
412 3643422 : if node.level == 0 {
413 : // Imagine that the node contains the following keys:
414 : //
415 : // 1
416 : // 3 <-- idx
417 : // 5
418 : //
419 : // If the search key is '2' and there is exact match,
420 : // the binary search would return the index of key
421 : // '3'. That's cool, '3' is the first key to return.
422 1248693 : idx
423 : } else {
424 : // This is an internal page, so each key represents a lower
425 : // bound for what's in the child page. If there is no exact
426 : // match, we have to return the *previous* entry.
427 : //
428 : // 1 <-- return this
429 : // 3 <-- idx
430 : // 5
431 2394729 : idx.saturating_sub(1)
432 : }
433 : }
434 : };
435 4863258 : Either::Left(idx..node.num_children.into())
436 : } else {
437 24156 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
438 12012 : Ok(idx) => {
439 12012 : // Exact match. That's the first entry to return, and walk
440 12012 : // backwards from there.
441 12012 : idx
442 : }
443 12144 : Err(idx) => {
444 : // No exact match. The binary search returned the index of the
445 : // first key that's > search_key. Back off by one, and walk
446 : // backwards from there.
447 12144 : if let Some(idx) = idx.checked_sub(1) {
448 12120 : idx
449 : } else {
450 24 : return Ok(false);
451 : }
452 : }
453 : };
454 24132 : Either::Right((0..=idx).rev())
455 : };
456 :
457 : // idx points to the first match now. Keep going from there
458 18974502 : while let Some(idx) = iter.next() {
459 16547910 : let key_off = idx * suffix_len;
460 16547910 : let suffix = &node.keys[key_off..key_off + suffix_len];
461 16547910 : keybuf[prefix_len..].copy_from_slice(suffix);
462 16547910 : let value = node.value(idx);
463 16547910 : #[allow(clippy::collapsible_if)]
464 16547910 : if node.level == 0 {
465 : // leaf
466 14130021 : if !visitor(&keybuf, value.to_u64()) {
467 1266297 : return Ok(false);
468 12863724 : }
469 : } else {
470 2417889 : stack.push((node_blknum, Some(iter)));
471 2417889 : stack.push((value.to_blknum(), None));
472 2417889 : break;
473 : }
474 : }
475 : }
476 1203204 : Ok(true)
477 2469525 : }
478 :
479 : #[allow(dead_code)]
480 60 : pub async fn dump(&self, ctx: &RequestContext) -> Result<()> {
481 60 : let mut stack = Vec::new();
482 60 :
483 60 : stack.push((self.root_blk, String::new(), 0, 0, 0));
484 60 :
485 60 : let block_cursor = self.reader.block_cursor();
486 :
487 36252 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
488 36192 : let blk = block_cursor.read_blk(self.start_blk + blknum, ctx).await?;
489 36192 : let buf: &[u8] = blk.as_ref();
490 36192 : let node = OnDiskNode::<L>::deparse(buf)?;
491 :
492 36192 : if child_idx == 0 {
493 108 : print!("{:indent$}", "", indent = depth * 2);
494 108 : let path_prefix = stack
495 108 : .iter()
496 108 : .map(|(_blknum, path, ..)| path.as_str())
497 108 : .collect::<String>();
498 108 : println!(
499 108 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
500 108 : hex::encode(node.prefix),
501 108 : node.suffix_len
502 108 : );
503 36084 : }
504 :
505 36192 : if child_idx + 1 < node.num_children {
506 36084 : let key_off = key_off + node.suffix_len as usize;
507 36084 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
508 36084 : }
509 36192 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
510 36192 : let val = node.value(child_idx as usize);
511 36192 :
512 36192 : print!("{:indent$}", "", indent = depth * 2 + 2);
513 36192 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
514 36192 :
515 36192 : if node.level > 0 {
516 48 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
517 36144 : }
518 : }
519 60 : Ok(())
520 60 : }
521 : }
522 :
523 : pub struct DiskBtreeIterator<'a> {
524 : #[allow(clippy::type_complexity)]
525 : stream: std::pin::Pin<
526 : Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
527 : >,
528 : }
529 :
530 : impl DiskBtreeIterator<'_> {
531 13942725 : pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
532 13942725 : self.stream.next().await
533 13942725 : }
534 : }
535 :
536 : ///
537 : /// Public builder object, for creating a new tree.
538 : ///
539 : /// Usage: Create a builder object by calling 'new', load all the data into the
540 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
541 : ///
542 : /// 'L' is the key length in bytes
543 : pub struct DiskBtreeBuilder<W, const L: usize>
544 : where
545 : W: BlockWriter,
546 : {
547 : writer: W,
548 :
549 : ///
550 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
551 : ///
552 : /// We maintain the length of the stack to be always greater than zero.
553 : /// Two exceptions are:
554 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
555 : /// So because other methods cannot see the intermediate state invariant still holds.
556 : /// 2. `Self::finish`. It consumes self and does not return it back,
557 : /// which means that this is where the structure is destroyed.
558 : /// Thus stack of zero length cannot be observed by other methods.
559 : stack: Vec<BuildNode<L>>,
560 :
561 : /// Last key that was appended to the tree. Used to sanity check that append
562 : /// is called in increasing key order.
563 : last_key: Option<[u8; L]>,
564 : }
565 :
566 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
567 : where
568 : W: BlockWriter,
569 : {
570 12672 : pub fn new(writer: W) -> Self {
571 12672 : DiskBtreeBuilder {
572 12672 : writer,
573 12672 : last_key: None,
574 12672 : stack: vec![BuildNode::new(0)],
575 12672 : }
576 12672 : }
577 :
578 40491825 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
579 40491825 : if value > MAX_VALUE {
580 0 : return Err(DiskBtreeError::AppendOverflow(value));
581 40491825 : }
582 40491825 : if let Some(last_key) = &self.last_key {
583 40480485 : if key <= last_key {
584 12 : return Err(DiskBtreeError::UnsortedInput {
585 12 : key: key.as_slice().into(),
586 12 : last_key: last_key.as_slice().into(),
587 12 : });
588 40480473 : }
589 11340 : }
590 40491813 : self.last_key = Some(*key);
591 40491813 :
592 40491813 : self.append_internal(key, Value::from_u64(value))
593 40491825 : }
594 :
595 40566749 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
596 40566749 : // Try to append to the current leaf buffer
597 40566749 : let last = self
598 40566749 : .stack
599 40566749 : .last_mut()
600 40566749 : .expect("should always have at least one item");
601 40566749 : let level = last.level;
602 40566749 : if last.push(key, value) {
603 40424365 : return Ok(());
604 142384 : }
605 142384 :
606 142384 : // It did not fit. Try to compress, and if it succeeds to make
607 142384 : // some room on the node, try appending to it again.
608 142384 : #[allow(clippy::collapsible_if)]
609 142384 : if last.compress() {
610 72320 : if last.push(key, value) {
611 72272 : return Ok(());
612 48 : }
613 70064 : }
614 :
615 : // Could not append to the current leaf. Flush it and create a new one.
616 70112 : self.flush_node()?;
617 :
618 : // Replace the node we flushed with an empty one and append the new
619 : // key to it.
620 70112 : let mut last = BuildNode::new(level);
621 70112 : if !last.push(key, value) {
622 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
623 70112 : }
624 70112 :
625 70112 : self.stack.push(last);
626 70112 :
627 70112 : Ok(())
628 40566749 : }
629 :
630 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
631 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
632 : /// creates a new root page, increasing the height of the tree.
633 74936 : fn flush_node(&mut self) -> Result<()> {
634 74936 : // Get the current bottommost node in the stack and flush it to disk.
635 74936 : let last = self
636 74936 : .stack
637 74936 : .pop()
638 74936 : .expect("should always have at least one item");
639 74936 : let buf = last.pack();
640 74936 : let downlink_key = last.first_key();
641 74936 : let downlink_ptr = self.writer.write_blk(buf)?;
642 :
643 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
644 : // create a new root page, increasing the height of the tree.
645 74936 : if self.stack.is_empty() {
646 4824 : self.stack.push(BuildNode::new(last.level + 1));
647 70112 : }
648 74936 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
649 74936 : }
650 :
651 : ///
652 : /// Flushes everything to disk, and returns the block number of the root page.
653 : /// The caller must store the root block number "out-of-band", and pass it
654 : /// to the DiskBtreeReader::new() when you want to read the tree again.
655 : /// (In the image and delta layers, it is stored in the beginning of the file,
656 : /// in the summary header)
657 : ///
658 11040 : pub fn finish(mut self) -> Result<(u32, W)> {
659 : // flush all levels, except the root.
660 15864 : while self.stack.len() > 1 {
661 4824 : self.flush_node()?;
662 : }
663 :
664 11040 : let root = self
665 11040 : .stack
666 11040 : .first()
667 11040 : .expect("by the check above we left one item there");
668 11040 : let buf = root.pack();
669 11040 : let root_blknum = self.writer.write_blk(buf)?;
670 :
671 11040 : Ok((root_blknum, self.writer))
672 11040 : }
673 :
674 12285996 : pub fn borrow_writer(&self) -> &W {
675 12285996 : &self.writer
676 12285996 : }
677 : }
678 :
679 : ///
680 : /// BuildNode represesnts an incomplete page that we are appending to.
681 : ///
682 : #[derive(Clone, Debug)]
683 : struct BuildNode<const L: usize> {
684 : num_children: u16,
685 : level: u8,
686 : prefix: Vec<u8>,
687 : suffix_len: usize,
688 :
689 : keys: Vec<u8>,
690 : values: Vec<u8>,
691 :
692 : size: usize, // physical size of this node, if it was written to disk like this
693 : }
694 :
695 : const NODE_SIZE: usize = PAGE_SZ;
696 :
697 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
698 :
699 : impl<const L: usize> BuildNode<L> {
700 87608 : fn new(level: u8) -> Self {
701 87608 : BuildNode {
702 87608 : num_children: 0,
703 87608 : level,
704 87608 : prefix: Vec::new(),
705 87608 : suffix_len: 0,
706 87608 : keys: Vec::new(),
707 87608 : values: Vec::new(),
708 87608 : size: NODE_HDR_SIZE,
709 87608 : }
710 87608 : }
711 :
712 : /// Try to append a key-value pair to this node. Returns 'true' on
713 : /// success, 'false' if the page was full or the key was
714 : /// incompatible with the prefix of the existing keys.
715 40709181 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
716 40709181 : // If we have already performed prefix-compression on the page,
717 40709181 : // check that the incoming key has the same prefix.
718 40709181 : if self.num_children > 0 {
719 : // does the prefix allow it?
720 40622905 : if !key.starts_with(&self.prefix) {
721 1274 : return false;
722 40621631 : }
723 86276 : } else {
724 86276 : self.suffix_len = key.len();
725 86276 : }
726 :
727 : // Is the node too full?
728 40707907 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
729 141158 : return false;
730 40566749 : }
731 40566749 :
732 40566749 : // All clear
733 40566749 : self.num_children += 1;
734 40566749 : self.keys.extend(&key[self.prefix.len()..]);
735 40566749 : self.values.extend(value.0);
736 40566749 :
737 40566749 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
738 40566749 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
739 :
740 40566749 : self.size += self.suffix_len + VALUE_SZ;
741 40566749 :
742 40566749 : true
743 40709181 : }
744 :
745 : ///
746 : /// Perform prefix-compression.
747 : ///
748 : /// Returns 'true' on success, 'false' if no compression was possible.
749 : ///
750 142384 : fn compress(&mut self) -> bool {
751 142384 : let first_suffix = self.first_suffix();
752 142384 : let last_suffix = self.last_suffix();
753 142384 :
754 142384 : // Find the common prefix among all keys
755 142384 : let mut prefix_len = 0;
756 1295934 : while prefix_len < self.suffix_len {
757 1295934 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
758 142384 : break;
759 1153550 : }
760 1153550 : prefix_len += 1;
761 : }
762 142384 : if prefix_len == 0 {
763 70064 : return false;
764 72320 : }
765 72320 :
766 72320 : // Can compress. Rewrite the keys without the common prefix.
767 72320 : self.prefix.extend(&self.keys[..prefix_len]);
768 72320 :
769 72320 : let mut new_keys = Vec::new();
770 72320 : let mut key_off = 0;
771 19348598 : while key_off < self.keys.len() {
772 19276278 : let next_key_off = key_off + self.suffix_len;
773 19276278 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
774 19276278 : key_off = next_key_off;
775 19276278 : }
776 72320 : self.keys = new_keys;
777 72320 : self.suffix_len -= prefix_len;
778 72320 :
779 72320 : self.size -= prefix_len * self.num_children as usize;
780 72320 : self.size += prefix_len;
781 72320 :
782 72320 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
783 72320 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
784 :
785 72320 : true
786 142384 : }
787 :
788 : ///
789 : /// Serialize the node to on-disk format.
790 : ///
791 85976 : fn pack(&self) -> IoBuffer {
792 85976 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
793 85976 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
794 85976 : assert!(self.num_children > 0);
795 :
796 85976 : let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
797 85976 :
798 85976 : buf.put_u16(self.num_children);
799 85976 : buf.put_u8(self.level);
800 85976 : buf.put_u8(self.prefix.len() as u8);
801 85976 : buf.put_u8(self.suffix_len as u8);
802 85976 : buf.put(&self.prefix[..]);
803 85976 : buf.put(&self.keys[..]);
804 85976 : buf.put(&self.values[..]);
805 85976 :
806 85976 : assert!(buf.len() == self.size);
807 :
808 85976 : assert!(buf.len() <= PAGE_SZ);
809 85976 : buf.extend_with(0, PAGE_SZ - buf.len());
810 85976 : buf.freeze()
811 85976 : }
812 :
813 217320 : fn first_suffix(&self) -> &[u8] {
814 217320 : &self.keys[..self.suffix_len]
815 217320 : }
816 142384 : fn last_suffix(&self) -> &[u8] {
817 142384 : &self.keys[self.keys.len() - self.suffix_len..]
818 142384 : }
819 :
820 : /// Return the full first key of the page, including the prefix
821 74936 : fn first_key(&self) -> [u8; L] {
822 74936 : let mut key = [0u8; L];
823 74936 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
824 74936 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
825 74936 : key
826 74936 : }
827 : }
828 :
829 : #[cfg(test)]
830 : pub(crate) mod tests {
831 : use std::collections::BTreeMap;
832 : use std::sync::atomic::{AtomicUsize, Ordering};
833 :
834 : use rand::Rng;
835 :
836 : use super::*;
837 : use crate::context::DownloadBehavior;
838 : use crate::task_mgr::TaskKind;
839 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
840 :
841 : #[derive(Clone, Default)]
842 : pub(crate) struct TestDisk {
843 : blocks: Vec<IoBuffer>,
844 : }
845 : impl TestDisk {
846 60 : fn new() -> Self {
847 60 : Self::default()
848 60 : }
849 6100260 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
850 6100260 : let mut buf = [0u8; PAGE_SZ];
851 6100260 : buf.copy_from_slice(&self.blocks[blknum as usize]);
852 6100260 : Ok(std::sync::Arc::new(buf).into())
853 6100260 : }
854 : }
855 : impl BlockReader for TestDisk {
856 2467161 : fn block_cursor(&self) -> BlockCursor<'_> {
857 2467161 : BlockCursor::new(BlockReaderRef::TestDisk(self))
858 2467161 : }
859 : }
860 : impl BlockWriter for &mut TestDisk {
861 1293 : fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
862 1293 : let blknum = self.blocks.len();
863 1293 : self.blocks.push(buf);
864 1293 : Ok(blknum as u32)
865 1293 : }
866 : }
867 :
868 : #[tokio::test]
869 12 : async fn basic() -> Result<()> {
870 12 : let mut disk = TestDisk::new();
871 12 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
872 12 :
873 12 : let ctx =
874 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
875 12 :
876 12 : let all_keys: Vec<&[u8; 6]> = vec![
877 12 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
878 12 : ];
879 12 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
880 12 : .iter()
881 12 : .enumerate()
882 96 : .map(|(idx, key)| (*key, idx as u64))
883 12 : .collect();
884 96 : for (key, val) in all_data.iter() {
885 96 : writer.append(key, *val)?;
886 12 : }
887 12 :
888 12 : let (root_offset, _writer) = writer.finish()?;
889 12 :
890 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
891 12 :
892 12 : reader.dump(&ctx).await?;
893 12 :
894 12 : // Test the `get` function on all the keys.
895 96 : for (key, val) in all_data.iter() {
896 96 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
897 12 : }
898 12 : // And on some keys that don't exist
899 12 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
900 12 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
901 12 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
902 12 :
903 12 : // Test search with `visit` function
904 12 : let search_key = b"xabaaa";
905 12 : let expected: Vec<(Vec<u8>, u64)> = all_data
906 12 : .iter()
907 96 : .filter(|(key, _value)| key[..] >= search_key[..])
908 60 : .map(|(key, value)| (key.to_vec(), *value))
909 12 : .collect();
910 12 :
911 12 : let mut data = Vec::new();
912 12 : reader
913 12 : .visit(
914 12 : search_key,
915 12 : VisitDirection::Forwards,
916 60 : |key, value| {
917 60 : data.push((key.to_vec(), value));
918 60 : true
919 60 : },
920 12 : &ctx,
921 12 : )
922 12 : .await?;
923 12 : assert_eq!(data, expected);
924 12 :
925 12 : // Test a backwards scan
926 12 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
927 12 : .iter()
928 96 : .filter(|(key, _value)| key[..] <= search_key[..])
929 48 : .map(|(key, value)| (key.to_vec(), *value))
930 12 : .collect();
931 12 : expected.reverse();
932 12 : let mut data = Vec::new();
933 12 : reader
934 12 : .visit(
935 12 : search_key,
936 12 : VisitDirection::Backwards,
937 48 : |key, value| {
938 48 : data.push((key.to_vec(), value));
939 48 : true
940 48 : },
941 12 : &ctx,
942 12 : )
943 12 : .await?;
944 12 : assert_eq!(data, expected);
945 12 :
946 12 : // Backward scan where nothing matches
947 12 : reader
948 12 : .visit(
949 12 : b"aaaaaa",
950 12 : VisitDirection::Backwards,
951 12 : |key, value| {
952 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
953 12 : },
954 12 : &ctx,
955 12 : )
956 12 : .await?;
957 12 :
958 12 : // Full scan
959 12 : let expected: Vec<(Vec<u8>, u64)> = all_data
960 12 : .iter()
961 96 : .map(|(key, value)| (key.to_vec(), *value))
962 12 : .collect();
963 12 : let mut data = Vec::new();
964 12 : reader
965 12 : .visit(
966 12 : &[0u8; 6],
967 12 : VisitDirection::Forwards,
968 96 : |key, value| {
969 96 : data.push((key.to_vec(), value));
970 96 : true
971 96 : },
972 12 : &ctx,
973 12 : )
974 12 : .await?;
975 12 : assert_eq!(data, expected);
976 12 :
977 12 : Ok(())
978 12 : }
979 :
980 : #[tokio::test]
981 12 : async fn lots_of_keys() -> Result<()> {
982 12 : let mut disk = TestDisk::new();
983 12 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
984 12 : let ctx =
985 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
986 12 :
987 12 : const NUM_KEYS: u64 = 1000;
988 12 :
989 12 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
990 12 :
991 12012 : for idx in 0..NUM_KEYS {
992 12000 : let key_int: u64 = 1 + idx * 2;
993 12000 : let key = u64::to_be_bytes(key_int);
994 12000 : writer.append(&key, idx)?;
995 12 :
996 12000 : all_data.insert(key_int, idx);
997 12 : }
998 12 :
999 12 : let (root_offset, _writer) = writer.finish()?;
1000 12 :
1001 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1002 12 :
1003 12 : reader.dump(&ctx).await?;
1004 12 :
1005 12 : use std::sync::Mutex;
1006 12 :
1007 12 : let result = Mutex::new(Vec::new());
1008 12 : let limit: AtomicUsize = AtomicUsize::new(10);
1009 502920 : let take_ten = |key: &[u8], value: u64| {
1010 502920 : let mut keybuf = [0u8; 8];
1011 502920 : keybuf.copy_from_slice(key);
1012 502920 : let key_int = u64::from_be_bytes(keybuf);
1013 502920 :
1014 502920 : let mut result = result.lock().unwrap();
1015 502920 : result.push((key_int, value));
1016 502920 :
1017 502920 : // keep going until we have 10 matches
1018 502920 : result.len() < limit.load(Ordering::Relaxed)
1019 502920 : };
1020 12 :
1021 24120 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
1022 24120 : let search_key = u64::to_be_bytes(search_key_int);
1023 24120 : assert_eq!(
1024 24120 : reader.get(&search_key, &ctx).await?,
1025 24120 : all_data.get(&search_key_int).cloned()
1026 12 : );
1027 12 :
1028 12 : // Test a forward scan starting with this key
1029 24120 : result.lock().unwrap().clear();
1030 24120 : reader
1031 24120 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1032 24120 : .await?;
1033 24120 : let expected = all_data
1034 24120 : .range(search_key_int..)
1035 24120 : .take(10)
1036 238920 : .map(|(&key, &val)| (key, val))
1037 24120 : .collect::<Vec<(u64, u64)>>();
1038 24120 : assert_eq!(*result.lock().unwrap(), expected);
1039 12 :
1040 12 : // And a backwards scan
1041 24120 : result.lock().unwrap().clear();
1042 24120 : reader
1043 24120 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1044 24120 : .await?;
1045 24120 : let expected = all_data
1046 24120 : .range(..=search_key_int)
1047 24120 : .rev()
1048 24120 : .take(10)
1049 240000 : .map(|(&key, &val)| (key, val))
1050 24120 : .collect::<Vec<(u64, u64)>>();
1051 24120 : assert_eq!(*result.lock().unwrap(), expected);
1052 12 : }
1053 12 :
1054 12 : // full scan
1055 12 : let search_key = u64::to_be_bytes(0);
1056 12 : limit.store(usize::MAX, Ordering::Relaxed);
1057 12 : result.lock().unwrap().clear();
1058 12 : reader
1059 12 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1060 12 : .await?;
1061 12 : let expected = all_data
1062 12 : .iter()
1063 12000 : .map(|(&key, &val)| (key, val))
1064 12 : .collect::<Vec<(u64, u64)>>();
1065 12 : assert_eq!(*result.lock().unwrap(), expected);
1066 12 :
1067 12 : // full scan
1068 12 : let search_key = u64::to_be_bytes(u64::MAX);
1069 12 : limit.store(usize::MAX, Ordering::Relaxed);
1070 12 : result.lock().unwrap().clear();
1071 12 : reader
1072 12 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1073 12 : .await?;
1074 12 : let expected = all_data
1075 12 : .iter()
1076 12 : .rev()
1077 12000 : .map(|(&key, &val)| (key, val))
1078 12 : .collect::<Vec<(u64, u64)>>();
1079 12 : assert_eq!(*result.lock().unwrap(), expected);
1080 12 :
1081 12 : Ok(())
1082 12 : }
1083 :
1084 : #[tokio::test]
1085 12 : async fn random_data() -> Result<()> {
1086 12 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1087 12 :
1088 12 : // Generate random keys with exponential distribution, to
1089 12 : // exercise the prefix compression
1090 12 : const NUM_KEYS: usize = 100000;
1091 12 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
1092 1200012 : for idx in 0..NUM_KEYS {
1093 1200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
1094 1200000 : let t = -(f64::ln(u));
1095 1200000 : let key_int = (t * 1000000.0) as u128;
1096 1200000 :
1097 1200000 : all_data.insert(key_int, idx as u64);
1098 1200000 : }
1099 12 :
1100 12 : // Build a tree from it
1101 12 : let mut disk = TestDisk::new();
1102 12 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
1103 12 :
1104 1170489 : for (&key, &val) in all_data.iter() {
1105 1170489 : writer.append(&u128::to_be_bytes(key), val)?;
1106 12 : }
1107 12 : let (root_offset, _writer) = writer.finish()?;
1108 12 :
1109 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1110 12 :
1111 12 : // Test get() operation on all the keys
1112 1170489 : for (&key, &val) in all_data.iter() {
1113 1170489 : let search_key = u128::to_be_bytes(key);
1114 1170489 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
1115 12 : }
1116 12 :
1117 12 : // Test get() operations on random keys, most of which will not exist
1118 1200012 : for _ in 0..100000 {
1119 1200000 : let key_int = rand::thread_rng().r#gen::<u128>();
1120 1200000 : let search_key = u128::to_be_bytes(key_int);
1121 1200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
1122 12 : }
1123 12 :
1124 12 : // Test boundary cases
1125 12 : assert!(
1126 12 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
1127 12 : == all_data.get(&u128::MIN).cloned()
1128 12 : );
1129 12 : assert!(
1130 12 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
1131 12 : == all_data.get(&u128::MAX).cloned()
1132 12 : );
1133 12 :
1134 12 : // Test iterator and get_stream API
1135 12 : let mut iter = reader.iter(&[0; 16], &ctx);
1136 12 : let mut cnt = 0;
1137 1170501 : while let Some(res) = iter.next().await {
1138 1170489 : let (key, val) = res?;
1139 1170489 : let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
1140 1170489 : assert_eq!(val, *all_data.get(&key).unwrap());
1141 1170489 : cnt += 1;
1142 12 : }
1143 12 : assert_eq!(cnt, all_data.len());
1144 12 :
1145 12 : Ok(())
1146 12 : }
1147 :
1148 : #[test]
1149 12 : fn unsorted_input() {
1150 12 : let mut disk = TestDisk::new();
1151 12 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1152 12 :
1153 12 : let _ = writer.append(b"ba", 1);
1154 12 : let _ = writer.append(b"bb", 2);
1155 12 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1156 12 : match err {
1157 12 : DiskBtreeError::UnsortedInput { key, last_key } => {
1158 12 : assert_eq!(key.as_ref(), b"aa".as_slice());
1159 12 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1160 : }
1161 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1162 : }
1163 12 : }
1164 :
1165 : ///
1166 : /// This test contains a particular data set, see disk_btree_test_data.rs
1167 : ///
1168 : #[tokio::test]
1169 12 : async fn particular_data() -> Result<()> {
1170 12 : // Build a tree from it
1171 12 : let mut disk = TestDisk::new();
1172 12 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1173 12 : let ctx =
1174 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
1175 12 :
1176 24012 : for (key, val) in disk_btree_test_data::TEST_DATA {
1177 24000 : writer.append(&key, val)?;
1178 12 : }
1179 12 : let (root_offset, writer) = writer.finish()?;
1180 12 :
1181 12 : println!("SIZE: {} blocks", writer.blocks.len());
1182 12 :
1183 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1184 12 :
1185 12 : // Test get() operation on all the keys
1186 24012 : for (key, val) in disk_btree_test_data::TEST_DATA {
1187 24000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1188 12 : }
1189 12 :
1190 12 : // Test full scan
1191 12 : let mut count = 0;
1192 12 : reader
1193 12 : .visit(
1194 12 : &[0u8; 26],
1195 12 : VisitDirection::Forwards,
1196 24000 : |_key, _value| {
1197 24000 : count += 1;
1198 24000 : true
1199 24000 : },
1200 12 : &ctx,
1201 12 : )
1202 12 : .await?;
1203 12 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1204 12 :
1205 12 : reader.dump(&ctx).await?;
1206 12 :
1207 12 : Ok(())
1208 12 : }
1209 : }
1210 :
1211 : #[cfg(test)]
1212 : #[path = "disk_btree_test_data.rs"]
1213 : mod disk_btree_test_data;
|