Line data Source code
1 : //!
2 : //! Simple on-disk B-tree implementation
3 : //!
4 : //! This is used as the index structure within image and delta layers
5 : //!
6 : //! Features:
7 : //! - Fixed-width keys
8 : //! - Fixed-width values (VALUE_SZ)
9 : //! - The tree is created in a bulk operation. Insert/deletion after creation
10 : //! is not supported
11 : //! - page-oriented
12 : //!
13 : //! TODO:
14 : //! - maybe something like an Adaptive Radix Tree would be more efficient?
15 : //! - the values stored by image and delta layers are offsets into the file,
16 : //! and they are in monotonically increasing order. Prefix compression would
17 : //! be very useful for them, too.
18 : //! - An Iterator interface would be more convenient for the callers than the
19 : //! 'visit' function
20 : //!
21 : use std::cmp::Ordering;
22 : use std::iter::Rev;
23 : use std::ops::{Range, RangeInclusive};
24 : use std::{io, result};
25 :
26 : use async_stream::try_stream;
27 : use byteorder::{BE, ReadBytesExt};
28 : use bytes::{BufMut, Bytes, BytesMut};
29 : use either::Either;
30 : use futures::{Stream, StreamExt};
31 : use hex;
32 : use thiserror::Error;
33 : use tracing::error;
34 :
35 : use crate::context::RequestContext;
36 : use crate::tenant::block_io::{BlockReader, BlockWriter};
37 :
38 : // The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
39 : pub const VALUE_SZ: usize = 5;
40 : pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
41 :
42 : pub const PAGE_SZ: usize = 8192;
43 :
44 : #[derive(Clone, Copy, Debug)]
45 : struct Value([u8; VALUE_SZ]);
46 :
47 : impl Value {
48 54676173 : fn from_slice(slice: &[u8]) -> Value {
49 54676173 : let mut b = [0u8; VALUE_SZ];
50 54676173 : b.copy_from_slice(slice);
51 54676173 : Value(b)
52 54676173 : }
53 :
54 40492036 : fn from_u64(x: u64) -> Value {
55 40492036 : assert!(x <= 0x007f_ffff_ffff);
56 40492036 : Value([
57 40492036 : (x >> 32) as u8,
58 40492036 : (x >> 24) as u8,
59 40492036 : (x >> 16) as u8,
60 40492036 : (x >> 8) as u8,
61 40492036 : x as u8,
62 40492036 : ])
63 40492036 : }
64 :
65 74988 : fn from_blknum(x: u32) -> Value {
66 74988 : Value([
67 74988 : 0x80,
68 74988 : (x >> 24) as u8,
69 74988 : (x >> 16) as u8,
70 74988 : (x >> 8) as u8,
71 74988 : x as u8,
72 74988 : ])
73 74988 : }
74 :
75 : #[allow(dead_code)]
76 0 : fn is_offset(self) -> bool {
77 0 : self.0[0] & 0x80 != 0
78 0 : }
79 :
80 50725595 : fn to_u64(self) -> u64 {
81 50725595 : let b = &self.0;
82 50725595 : ((b[0] as u64) << 32)
83 50725595 : | ((b[1] as u64) << 24)
84 50725595 : | ((b[2] as u64) << 16)
85 50725595 : | ((b[3] as u64) << 8)
86 50725595 : | b[4] as u64
87 50725595 : }
88 :
89 3914434 : fn to_blknum(self) -> u32 {
90 3914434 : let b = &self.0;
91 3914434 : assert!(b[0] == 0x80);
92 3914434 : ((b[1] as u32) << 24) | ((b[2] as u32) << 16) | ((b[3] as u32) << 8) | b[4] as u32
93 3914434 : }
94 : }
95 :
96 : #[derive(Error, Debug)]
97 : pub enum DiskBtreeError {
98 : #[error("Attempt to append a value that is too large {0} > {}", MAX_VALUE)]
99 : AppendOverflow(u64),
100 :
101 : #[error("Unsorted input: key {key:?} is <= last_key {last_key:?}")]
102 : UnsortedInput { key: Box<[u8]>, last_key: Box<[u8]> },
103 :
104 : #[error("Could not push to new leaf node")]
105 : FailedToPushToNewLeafNode,
106 :
107 : #[error("IoError: {0}")]
108 : Io(#[from] io::Error),
109 : }
110 :
111 : pub type Result<T> = result::Result<T, DiskBtreeError>;
112 :
113 : /// This is the on-disk representation.
114 : struct OnDiskNode<'a, const L: usize> {
115 : // Fixed-width fields
116 : num_children: u16,
117 : level: u8,
118 : prefix_len: u8,
119 : suffix_len: u8,
120 :
121 : // Variable-length fields. These are stored on-disk after the fixed-width
122 : // fields, in this order. In the in-memory representation, these point to
123 : // the right parts in the page buffer.
124 : prefix: &'a [u8],
125 : keys: &'a [u8],
126 : values: &'a [u8],
127 : }
128 :
129 : impl<const L: usize> OnDiskNode<'_, L> {
130 : ///
131 : /// Interpret a PAGE_SZ page as a node.
132 : ///
133 9471498 : fn deparse(buf: &[u8]) -> Result<OnDiskNode<L>> {
134 9471498 : let mut cursor = std::io::Cursor::new(buf);
135 9471498 : let num_children = cursor.read_u16::<BE>()?;
136 9471498 : let level = cursor.read_u8()?;
137 9471498 : let prefix_len = cursor.read_u8()?;
138 9471498 : let suffix_len = cursor.read_u8()?;
139 :
140 9471498 : let mut off = cursor.position();
141 9471498 : let prefix_off = off as usize;
142 9471498 : off += prefix_len as u64;
143 9471498 :
144 9471498 : let keys_off = off as usize;
145 9471498 : let keys_len = num_children as usize * suffix_len as usize;
146 9471498 : off += keys_len as u64;
147 9471498 :
148 9471498 : let values_off = off as usize;
149 9471498 : let values_len = num_children as usize * VALUE_SZ;
150 9471498 : //off += values_len as u64;
151 9471498 :
152 9471498 : let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
153 9471498 : let keys = &buf[keys_off..keys_off + keys_len];
154 9471498 : let values = &buf[values_off..values_off + values_len];
155 9471498 :
156 9471498 : Ok(OnDiskNode {
157 9471498 : num_children,
158 9471498 : level,
159 9471498 : prefix_len,
160 9471498 : suffix_len,
161 9471498 : prefix,
162 9471498 : keys,
163 9471498 : values,
164 9471498 : })
165 9471498 : }
166 :
167 : ///
168 : /// Read a value at 'idx'
169 : ///
170 54676173 : fn value(&self, idx: usize) -> Value {
171 54676173 : let value_off = idx * VALUE_SZ;
172 54676173 : let value_slice = &self.values[value_off..value_off + VALUE_SZ];
173 54676173 : Value::from_slice(value_slice)
174 54676173 : }
175 :
176 8104712 : fn binary_search(
177 8104712 : &self,
178 8104712 : search_key: &[u8; L],
179 8104712 : keybuf: &mut [u8],
180 8104712 : ) -> result::Result<usize, usize> {
181 8104712 : let mut size = self.num_children as usize;
182 8104712 : let mut low = 0;
183 8104712 : let mut high = size;
184 60373786 : while low < high {
185 53919043 : let mid = low + size / 2;
186 53919043 :
187 53919043 : let key_off = mid * self.suffix_len as usize;
188 53919043 : let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
189 53919043 : // Does this match?
190 53919043 : keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
191 53919043 :
192 53919043 : let cmp = keybuf[..].cmp(search_key);
193 53919043 :
194 53919043 : if cmp == Ordering::Less {
195 33523723 : low = mid + 1;
196 33523723 : } else if cmp == Ordering::Greater {
197 18745351 : high = mid;
198 18745351 : } else {
199 1649969 : return Ok(mid);
200 : }
201 52269074 : size = high - low;
202 : }
203 6454743 : Err(low)
204 8104712 : }
205 : }
206 :
207 : ///
208 : /// Public reader object, to search the tree.
209 : ///
210 : #[derive(Clone)]
211 : pub struct DiskBtreeReader<R, const L: usize>
212 : where
213 : R: BlockReader,
214 : {
215 : start_blk: u32,
216 : root_blk: u32,
217 : reader: R,
218 : }
219 :
220 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
221 : pub enum VisitDirection {
222 : Forwards,
223 : Backwards,
224 : }
225 :
226 : impl<R, const L: usize> DiskBtreeReader<R, L>
227 : where
228 : R: BlockReader,
229 : {
230 1604766 : pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
231 1604766 : DiskBtreeReader {
232 1604766 : start_blk,
233 1604766 : root_blk,
234 1604766 : reader,
235 1604766 : }
236 1604766 : }
237 :
238 : ///
239 : /// Read the value for given key. Returns the value, or None if it doesn't exist.
240 : ///
241 2418988 : pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
242 2418988 : let mut result: Option<u64> = None;
243 2418988 : self.visit(
244 2418988 : search_key,
245 2418988 : VisitDirection::Forwards,
246 2418988 : |key, value| {
247 1218844 : if key == search_key {
248 1206808 : result = Some(value);
249 1206808 : }
250 1218844 : false
251 2418988 : },
252 2418988 : ctx,
253 2418988 : )
254 2418988 : .await?;
255 2418988 : Ok(result)
256 2418988 : }
257 :
258 4224 : pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
259 4224 : where
260 4224 : R: 'a + Send,
261 4224 : {
262 4224 : DiskBtreeIterator {
263 4224 : stream: Box::pin(self.into_stream(start_key, ctx)),
264 4224 : }
265 4224 : }
266 :
267 : /// Return a stream which yields all key, value pairs from the index
268 : /// starting from the first key greater or equal to `start_key`.
269 : ///
270 : /// Note 1: that this is a copy of [`Self::visit`].
271 : /// TODO: Once the sequential read path is removed this will become
272 : /// the only index traversal method.
273 : ///
274 : /// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
275 : /// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
276 : /// requires the reader to be present when the stream is used, and this creates a lifetime
277 : /// dependency between the reader and the stream. Now if we want to create an iterator that
278 : /// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
279 : /// to use from the image/delta layer APIs.
280 : ///
281 : /// Feel free to add the `&self` variant back if it's necessary.
282 1720578 : pub fn into_stream<'a>(
283 1720578 : self,
284 1720578 : start_key: &'a [u8; L],
285 1720578 : ctx: &'a RequestContext,
286 1720578 : ) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
287 1720578 : where
288 1720578 : R: 'a,
289 1720578 : {
290 1720578 : try_stream! {
291 1720578 : let mut stack = Vec::new();
292 1720578 : stack.push((self.root_blk, None));
293 1720578 : let block_cursor = self.reader.block_cursor();
294 1720578 : let mut node_buf = [0_u8; PAGE_SZ];
295 1720578 : while let Some((node_blknum, opt_iter)) = stack.pop() {
296 1720578 : // Read the node, through the PS PageCache, into local variable `node_buf`.
297 1720578 : // We could keep the page cache read guard alive, but, at the time of writing,
298 1720578 : // we run quite small PS PageCache s => can't risk running out of
299 1720578 : // PageCache space because this stream isn't consumed fast enough.
300 1720578 : let page_read_guard = block_cursor
301 1720578 : .read_blk(self.start_blk + node_blknum, ctx)
302 1720578 : .await?;
303 1720578 : node_buf.copy_from_slice(page_read_guard.as_ref());
304 1720578 : drop(page_read_guard); // drop page cache read guard early
305 1720578 :
306 1720578 : let node = OnDiskNode::deparse(&node_buf)?;
307 1720578 : let prefix_len = node.prefix_len as usize;
308 1720578 : let suffix_len = node.suffix_len as usize;
309 1720578 :
310 1720578 : assert!(node.num_children > 0);
311 1720578 :
312 1720578 : let mut keybuf = Vec::new();
313 1720578 : keybuf.extend(node.prefix);
314 1720578 : keybuf.resize(prefix_len + suffix_len, 0);
315 1720578 :
316 1720578 : let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
317 1720578 : iter
318 1720578 : } else {
319 1720578 : // Locate the first match
320 1720578 : let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
321 1720578 : Ok(idx) => idx,
322 1720578 : Err(idx) => {
323 1720578 : if node.level == 0 {
324 1720578 : // Imagine that the node contains the following keys:
325 1720578 : //
326 1720578 : // 1
327 1720578 : // 3 <-- idx
328 1720578 : // 5
329 1720578 : //
330 1720578 : // If the search key is '2' and there is exact match,
331 1720578 : // the binary search would return the index of key
332 1720578 : // '3'. That's cool, '3' is the first key to return.
333 1720578 : idx
334 1720578 : } else {
335 1720578 : // This is an internal page, so each key represents a lower
336 1720578 : // bound for what's in the child page. If there is no exact
337 1720578 : // match, we have to return the *previous* entry.
338 1720578 : //
339 1720578 : // 1 <-- return this
340 1720578 : // 3 <-- idx
341 1720578 : // 5
342 1720578 : idx.saturating_sub(1)
343 1720578 : }
344 1720578 : }
345 1720578 : };
346 1720578 : Either::Left(idx..node.num_children.into())
347 1720578 : };
348 1720578 :
349 1720578 :
350 1720578 : // idx points to the first match now. Keep going from there
351 1720578 : while let Some(idx) = iter.next() {
352 1720578 : let key_off = idx * suffix_len;
353 1720578 : let suffix = &node.keys[key_off..key_off + suffix_len];
354 1720578 : keybuf[prefix_len..].copy_from_slice(suffix);
355 1720578 : let value = node.value(idx);
356 1720578 : #[allow(clippy::collapsible_if)]
357 1720578 : if node.level == 0 {
358 1720578 : // leaf
359 1720578 : yield (keybuf.clone(), value.to_u64());
360 1720578 : } else {
361 1720578 : stack.push((node_blknum, Some(iter)));
362 1720578 : stack.push((value.to_blknum(), None));
363 1720578 : break;
364 1720578 : }
365 1720578 : }
366 1720578 : }
367 1720578 : }
368 1720578 : }
369 :
370 : ///
371 : /// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
372 : /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
373 : /// backwards)
374 : ///
375 2469748 : pub async fn visit<V>(
376 2469748 : &self,
377 2469748 : search_key: &[u8; L],
378 2469748 : dir: VisitDirection,
379 2469748 : mut visitor: V,
380 2469748 : ctx: &RequestContext,
381 2469748 : ) -> Result<bool>
382 2469748 : where
383 2469748 : V: FnMut(&[u8], u64) -> bool,
384 2469748 : {
385 2469748 : let mut stack = Vec::new();
386 2469748 : stack.push((self.root_blk, None));
387 2469748 : let block_cursor = self.reader.block_cursor();
388 7314452 : while let Some((node_blknum, opt_iter)) = stack.pop() {
389 : // Locate the node.
390 6111248 : let node_buf = block_cursor
391 6111248 : .read_blk(self.start_blk + node_blknum, ctx)
392 6111248 : .await?;
393 :
394 6111248 : let node = OnDiskNode::deparse(node_buf.as_ref())?;
395 6111248 : let prefix_len = node.prefix_len as usize;
396 6111248 : let suffix_len = node.suffix_len as usize;
397 6111248 :
398 6111248 : assert!(node.num_children > 0);
399 :
400 6111248 : let mut keybuf = Vec::new();
401 6111248 : keybuf.extend(node.prefix);
402 6111248 : keybuf.resize(prefix_len + suffix_len, 0);
403 :
404 6111248 : let mut iter = if let Some(iter) = opt_iter {
405 1223388 : iter
406 4887860 : } else if dir == VisitDirection::Forwards {
407 : // Locate the first match
408 4863704 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
409 1220057 : Ok(idx) => idx,
410 3643647 : Err(idx) => {
411 3643647 : if node.level == 0 {
412 : // Imagine that the node contains the following keys:
413 : //
414 : // 1
415 : // 3 <-- idx
416 : // 5
417 : //
418 : // If the search key is '2' and there is exact match,
419 : // the binary search would return the index of key
420 : // '3'. That's cool, '3' is the first key to return.
421 1248696 : idx
422 : } else {
423 : // This is an internal page, so each key represents a lower
424 : // bound for what's in the child page. If there is no exact
425 : // match, we have to return the *previous* entry.
426 : //
427 : // 1 <-- return this
428 : // 3 <-- idx
429 : // 5
430 2394951 : idx.saturating_sub(1)
431 : }
432 : }
433 : };
434 4863704 : Either::Left(idx..node.num_children.into())
435 : } else {
436 24156 : let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
437 12012 : Ok(idx) => {
438 12012 : // Exact match. That's the first entry to return, and walk
439 12012 : // backwards from there.
440 12012 : idx
441 : }
442 12144 : Err(idx) => {
443 : // No exact match. The binary search returned the index of the
444 : // first key that's > search_key. Back off by one, and walk
445 : // backwards from there.
446 12144 : if let Some(idx) = idx.checked_sub(1) {
447 12120 : idx
448 : } else {
449 24 : return Ok(false);
450 : }
451 : }
452 : };
453 24132 : Either::Right((0..=idx).rev())
454 : };
455 :
456 : // idx points to the first match now. Keep going from there
457 18974948 : while let Some(idx) = iter.next() {
458 16548356 : let key_off = idx * suffix_len;
459 16548356 : let suffix = &node.keys[key_off..key_off + suffix_len];
460 16548356 : keybuf[prefix_len..].copy_from_slice(suffix);
461 16548356 : let value = node.value(idx);
462 16548356 : #[allow(clippy::collapsible_if)]
463 16548356 : if node.level == 0 {
464 : // leaf
465 14130244 : if !visitor(&keybuf, value.to_u64()) {
466 1266520 : return Ok(false);
467 12863724 : }
468 : } else {
469 2418112 : stack.push((node_blknum, Some(iter)));
470 2418112 : stack.push((value.to_blknum(), None));
471 2418112 : break;
472 : }
473 : }
474 : }
475 1203204 : Ok(true)
476 2469748 : }
477 :
478 : #[allow(dead_code)]
479 60 : pub async fn dump(&self, ctx: &RequestContext) -> Result<()> {
480 60 : let mut stack = Vec::new();
481 60 :
482 60 : stack.push((self.root_blk, String::new(), 0, 0, 0));
483 60 :
484 60 : let block_cursor = self.reader.block_cursor();
485 :
486 36252 : while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
487 36192 : let blk = block_cursor.read_blk(self.start_blk + blknum, ctx).await?;
488 36192 : let buf: &[u8] = blk.as_ref();
489 36192 : let node = OnDiskNode::<L>::deparse(buf)?;
490 :
491 36192 : if child_idx == 0 {
492 108 : print!("{:indent$}", "", indent = depth * 2);
493 108 : let path_prefix = stack
494 108 : .iter()
495 108 : .map(|(_blknum, path, ..)| path.as_str())
496 108 : .collect::<String>();
497 108 : println!(
498 108 : "blk #{blknum}: path {path_prefix}{path}: prefix {}, suffix_len {}",
499 108 : hex::encode(node.prefix),
500 108 : node.suffix_len
501 108 : );
502 36084 : }
503 :
504 36192 : if child_idx + 1 < node.num_children {
505 36084 : let key_off = key_off + node.suffix_len as usize;
506 36084 : stack.push((blknum, path.clone(), depth, child_idx + 1, key_off));
507 36084 : }
508 36192 : let key = &node.keys[key_off..key_off + node.suffix_len as usize];
509 36192 : let val = node.value(child_idx as usize);
510 36192 :
511 36192 : print!("{:indent$}", "", indent = depth * 2 + 2);
512 36192 : println!("{}: {}", hex::encode(key), hex::encode(val.0));
513 36192 :
514 36192 : if node.level > 0 {
515 48 : stack.push((val.to_blknum(), hex::encode(node.prefix), depth + 1, 0, 0));
516 36144 : }
517 : }
518 60 : Ok(())
519 60 : }
520 : }
521 :
522 : pub struct DiskBtreeIterator<'a> {
523 : #[allow(clippy::type_complexity)]
524 : stream: std::pin::Pin<
525 : Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
526 : >,
527 : }
528 :
529 : impl DiskBtreeIterator<'_> {
530 13942948 : pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
531 13942948 : self.stream.next().await
532 13942948 : }
533 : }
534 :
535 : ///
536 : /// Public builder object, for creating a new tree.
537 : ///
538 : /// Usage: Create a builder object by calling 'new', load all the data into the
539 : /// tree by calling 'append' for each key-value pair, and then call 'finish'
540 : ///
541 : /// 'L' is the key length in bytes
542 : pub struct DiskBtreeBuilder<W, const L: usize>
543 : where
544 : W: BlockWriter,
545 : {
546 : writer: W,
547 :
548 : ///
549 : /// `stack[0]` is the current root page, `stack.last()` is the leaf.
550 : ///
551 : /// We maintain the length of the stack to be always greater than zero.
552 : /// Two exceptions are:
553 : /// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
554 : /// So because other methods cannot see the intermediate state invariant still holds.
555 : /// 2. `Self::finish`. It consumes self and does not return it back,
556 : /// which means that this is where the structure is destroyed.
557 : /// Thus stack of zero length cannot be observed by other methods.
558 : stack: Vec<BuildNode<L>>,
559 :
560 : /// Last key that was appended to the tree. Used to sanity check that append
561 : /// is called in increasing key order.
562 : last_key: Option<[u8; L]>,
563 : }
564 :
565 : impl<W, const L: usize> DiskBtreeBuilder<W, L>
566 : where
567 : W: BlockWriter,
568 : {
569 12672 : pub fn new(writer: W) -> Self {
570 12672 : DiskBtreeBuilder {
571 12672 : writer,
572 12672 : last_key: None,
573 12672 : stack: vec![BuildNode::new(0)],
574 12672 : }
575 12672 : }
576 :
577 40492048 : pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<()> {
578 40492048 : if value > MAX_VALUE {
579 0 : return Err(DiskBtreeError::AppendOverflow(value));
580 40492048 : }
581 40492048 : if let Some(last_key) = &self.last_key {
582 40480708 : if key <= last_key {
583 12 : return Err(DiskBtreeError::UnsortedInput {
584 12 : key: key.as_slice().into(),
585 12 : last_key: last_key.as_slice().into(),
586 12 : });
587 40480696 : }
588 11340 : }
589 40492036 : self.last_key = Some(*key);
590 40492036 :
591 40492036 : self.append_internal(key, Value::from_u64(value))
592 40492048 : }
593 :
594 40567024 : fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
595 40567024 : // Try to append to the current leaf buffer
596 40567024 : let last = self
597 40567024 : .stack
598 40567024 : .last_mut()
599 40567024 : .expect("should always have at least one item");
600 40567024 : let level = last.level;
601 40567024 : if last.push(key, value) {
602 40424564 : return Ok(());
603 142460 : }
604 142460 :
605 142460 : // It did not fit. Try to compress, and if it succeeds to make
606 142460 : // some room on the node, try appending to it again.
607 142460 : #[allow(clippy::collapsible_if)]
608 142460 : if last.compress() {
609 72345 : if last.push(key, value) {
610 72296 : return Ok(());
611 49 : }
612 70115 : }
613 :
614 : // Could not append to the current leaf. Flush it and create a new one.
615 70164 : self.flush_node()?;
616 :
617 : // Replace the node we flushed with an empty one and append the new
618 : // key to it.
619 70164 : let mut last = BuildNode::new(level);
620 70164 : if !last.push(key, value) {
621 0 : return Err(DiskBtreeError::FailedToPushToNewLeafNode);
622 70164 : }
623 70164 :
624 70164 : self.stack.push(last);
625 70164 :
626 70164 : Ok(())
627 40567024 : }
628 :
629 : /// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
630 : /// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
631 : /// creates a new root page, increasing the height of the tree.
632 74988 : fn flush_node(&mut self) -> Result<()> {
633 74988 : // Get the current bottommost node in the stack and flush it to disk.
634 74988 : let last = self
635 74988 : .stack
636 74988 : .pop()
637 74988 : .expect("should always have at least one item");
638 74988 : let buf = last.pack();
639 74988 : let downlink_key = last.first_key();
640 74988 : let downlink_ptr = self.writer.write_blk(buf)?;
641 :
642 : // Append the downlink to the parent. If there is no parent, ie. this was the root page,
643 : // create a new root page, increasing the height of the tree.
644 74988 : if self.stack.is_empty() {
645 4824 : self.stack.push(BuildNode::new(last.level + 1));
646 70164 : }
647 74988 : self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
648 74988 : }
649 :
650 : ///
651 : /// Flushes everything to disk, and returns the block number of the root page.
652 : /// The caller must store the root block number "out-of-band", and pass it
653 : /// to the DiskBtreeReader::new() when you want to read the tree again.
654 : /// (In the image and delta layers, it is stored in the beginning of the file,
655 : /// in the summary header)
656 : ///
657 11040 : pub fn finish(mut self) -> Result<(u32, W)> {
658 : // flush all levels, except the root.
659 15864 : while self.stack.len() > 1 {
660 4824 : self.flush_node()?;
661 : }
662 :
663 11040 : let root = self
664 11040 : .stack
665 11040 : .first()
666 11040 : .expect("by the check above we left one item there");
667 11040 : let buf = root.pack();
668 11040 : let root_blknum = self.writer.write_blk(buf)?;
669 :
670 11040 : Ok((root_blknum, self.writer))
671 11040 : }
672 :
673 12285996 : pub fn borrow_writer(&self) -> &W {
674 12285996 : &self.writer
675 12285996 : }
676 : }
677 :
678 : ///
679 : /// BuildNode represesnts an incomplete page that we are appending to.
680 : ///
681 : #[derive(Clone, Debug)]
682 : struct BuildNode<const L: usize> {
683 : num_children: u16,
684 : level: u8,
685 : prefix: Vec<u8>,
686 : suffix_len: usize,
687 :
688 : keys: Vec<u8>,
689 : values: Vec<u8>,
690 :
691 : size: usize, // physical size of this node, if it was written to disk like this
692 : }
693 :
694 : const NODE_SIZE: usize = PAGE_SZ;
695 :
696 : const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
697 :
698 : impl<const L: usize> BuildNode<L> {
699 87660 : fn new(level: u8) -> Self {
700 87660 : BuildNode {
701 87660 : num_children: 0,
702 87660 : level,
703 87660 : prefix: Vec::new(),
704 87660 : suffix_len: 0,
705 87660 : keys: Vec::new(),
706 87660 : values: Vec::new(),
707 87660 : size: NODE_HDR_SIZE,
708 87660 : }
709 87660 : }
710 :
711 : /// Try to append a key-value pair to this node. Returns 'true' on
712 : /// success, 'false' if the page was full or the key was
713 : /// incompatible with the prefix of the existing keys.
714 40709533 : fn push(&mut self, key: &[u8; L], value: Value) -> bool {
715 40709533 : // If we have already performed prefix-compression on the page,
716 40709533 : // check that the incoming key has the same prefix.
717 40709533 : if self.num_children > 0 {
718 : // does the prefix allow it?
719 40623205 : if !key.starts_with(&self.prefix) {
720 1357 : return false;
721 40621848 : }
722 86328 : } else {
723 86328 : self.suffix_len = key.len();
724 86328 : }
725 :
726 : // Is the node too full?
727 40708176 : if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
728 141152 : return false;
729 40567024 : }
730 40567024 :
731 40567024 : // All clear
732 40567024 : self.num_children += 1;
733 40567024 : self.keys.extend(&key[self.prefix.len()..]);
734 40567024 : self.values.extend(value.0);
735 40567024 :
736 40567024 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
737 40567024 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
738 :
739 40567024 : self.size += self.suffix_len + VALUE_SZ;
740 40567024 :
741 40567024 : true
742 40709533 : }
743 :
744 : ///
745 : /// Perform prefix-compression.
746 : ///
747 : /// Returns 'true' on success, 'false' if no compression was possible.
748 : ///
749 142460 : fn compress(&mut self) -> bool {
750 142460 : let first_suffix = self.first_suffix();
751 142460 : let last_suffix = self.last_suffix();
752 142460 :
753 142460 : // Find the common prefix among all keys
754 142460 : let mut prefix_len = 0;
755 1296497 : while prefix_len < self.suffix_len {
756 1296497 : if first_suffix[prefix_len] != last_suffix[prefix_len] {
757 142460 : break;
758 1154037 : }
759 1154037 : prefix_len += 1;
760 : }
761 142460 : if prefix_len == 0 {
762 70115 : return false;
763 72345 : }
764 72345 :
765 72345 : // Can compress. Rewrite the keys without the common prefix.
766 72345 : self.prefix.extend(&self.keys[..prefix_len]);
767 72345 :
768 72345 : let mut new_keys = Vec::new();
769 72345 : let mut key_off = 0;
770 19355223 : while key_off < self.keys.len() {
771 19282878 : let next_key_off = key_off + self.suffix_len;
772 19282878 : new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
773 19282878 : key_off = next_key_off;
774 19282878 : }
775 72345 : self.keys = new_keys;
776 72345 : self.suffix_len -= prefix_len;
777 72345 :
778 72345 : self.size -= prefix_len * self.num_children as usize;
779 72345 : self.size += prefix_len;
780 72345 :
781 72345 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
782 72345 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
783 :
784 72345 : true
785 142460 : }
786 :
787 : ///
788 : /// Serialize the node to on-disk format.
789 : ///
790 86028 : fn pack(&self) -> Bytes {
791 86028 : assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
792 86028 : assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
793 86028 : assert!(self.num_children > 0);
794 :
795 86028 : let mut buf = BytesMut::new();
796 86028 :
797 86028 : buf.put_u16(self.num_children);
798 86028 : buf.put_u8(self.level);
799 86028 : buf.put_u8(self.prefix.len() as u8);
800 86028 : buf.put_u8(self.suffix_len as u8);
801 86028 : buf.put(&self.prefix[..]);
802 86028 : buf.put(&self.keys[..]);
803 86028 : buf.put(&self.values[..]);
804 86028 :
805 86028 : assert!(buf.len() == self.size);
806 :
807 86028 : assert!(buf.len() <= PAGE_SZ);
808 86028 : buf.resize(PAGE_SZ, 0);
809 86028 : buf.freeze()
810 86028 : }
811 :
812 217448 : fn first_suffix(&self) -> &[u8] {
813 217448 : &self.keys[..self.suffix_len]
814 217448 : }
815 142460 : fn last_suffix(&self) -> &[u8] {
816 142460 : &self.keys[self.keys.len() - self.suffix_len..]
817 142460 : }
818 :
819 : /// Return the full first key of the page, including the prefix
820 74988 : fn first_key(&self) -> [u8; L] {
821 74988 : let mut key = [0u8; L];
822 74988 : key[..self.prefix.len()].copy_from_slice(&self.prefix);
823 74988 : key[self.prefix.len()..].copy_from_slice(self.first_suffix());
824 74988 : key
825 74988 : }
826 : }
827 :
828 : #[cfg(test)]
829 : pub(crate) mod tests {
830 : use std::collections::BTreeMap;
831 : use std::sync::atomic::{AtomicUsize, Ordering};
832 :
833 : use rand::Rng;
834 :
835 : use super::*;
836 : use crate::context::DownloadBehavior;
837 : use crate::task_mgr::TaskKind;
838 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
839 :
840 : #[derive(Clone, Default)]
841 : pub(crate) struct TestDisk {
842 : blocks: Vec<Bytes>,
843 : }
844 : impl TestDisk {
845 60 : fn new() -> Self {
846 60 : Self::default()
847 60 : }
848 6100714 : pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
849 6100714 : let mut buf = [0u8; PAGE_SZ];
850 6100714 : buf.copy_from_slice(&self.blocks[blknum as usize]);
851 6100714 : Ok(std::sync::Arc::new(buf).into())
852 6100714 : }
853 : }
854 : impl BlockReader for TestDisk {
855 2467384 : fn block_cursor(&self) -> BlockCursor<'_> {
856 2467384 : BlockCursor::new(BlockReaderRef::TestDisk(self))
857 2467384 : }
858 : }
859 : impl BlockWriter for &mut TestDisk {
860 1297 : fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
861 1297 : let blknum = self.blocks.len();
862 1297 : self.blocks.push(buf);
863 1297 : Ok(blknum as u32)
864 1297 : }
865 : }
866 :
867 : #[tokio::test]
868 12 : async fn basic() -> Result<()> {
869 12 : let mut disk = TestDisk::new();
870 12 : let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
871 12 :
872 12 : let ctx =
873 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
874 12 :
875 12 : let all_keys: Vec<&[u8; 6]> = vec![
876 12 : b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
877 12 : ];
878 12 : let all_data: Vec<(&[u8; 6], u64)> = all_keys
879 12 : .iter()
880 12 : .enumerate()
881 96 : .map(|(idx, key)| (*key, idx as u64))
882 12 : .collect();
883 96 : for (key, val) in all_data.iter() {
884 96 : writer.append(key, *val)?;
885 12 : }
886 12 :
887 12 : let (root_offset, _writer) = writer.finish()?;
888 12 :
889 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
890 12 :
891 12 : reader.dump(&ctx).await?;
892 12 :
893 12 : // Test the `get` function on all the keys.
894 96 : for (key, val) in all_data.iter() {
895 96 : assert_eq!(reader.get(key, &ctx).await?, Some(*val));
896 12 : }
897 12 : // And on some keys that don't exist
898 12 : assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
899 12 : assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
900 12 : assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
901 12 :
902 12 : // Test search with `visit` function
903 12 : let search_key = b"xabaaa";
904 12 : let expected: Vec<(Vec<u8>, u64)> = all_data
905 12 : .iter()
906 96 : .filter(|(key, _value)| key[..] >= search_key[..])
907 60 : .map(|(key, value)| (key.to_vec(), *value))
908 12 : .collect();
909 12 :
910 12 : let mut data = Vec::new();
911 12 : reader
912 12 : .visit(
913 12 : search_key,
914 12 : VisitDirection::Forwards,
915 60 : |key, value| {
916 60 : data.push((key.to_vec(), value));
917 60 : true
918 60 : },
919 12 : &ctx,
920 12 : )
921 12 : .await?;
922 12 : assert_eq!(data, expected);
923 12 :
924 12 : // Test a backwards scan
925 12 : let mut expected: Vec<(Vec<u8>, u64)> = all_data
926 12 : .iter()
927 96 : .filter(|(key, _value)| key[..] <= search_key[..])
928 48 : .map(|(key, value)| (key.to_vec(), *value))
929 12 : .collect();
930 12 : expected.reverse();
931 12 : let mut data = Vec::new();
932 12 : reader
933 12 : .visit(
934 12 : search_key,
935 12 : VisitDirection::Backwards,
936 48 : |key, value| {
937 48 : data.push((key.to_vec(), value));
938 48 : true
939 48 : },
940 12 : &ctx,
941 12 : )
942 12 : .await?;
943 12 : assert_eq!(data, expected);
944 12 :
945 12 : // Backward scan where nothing matches
946 12 : reader
947 12 : .visit(
948 12 : b"aaaaaa",
949 12 : VisitDirection::Backwards,
950 12 : |key, value| {
951 0 : panic!("found unexpected key {}: {}", hex::encode(key), value);
952 12 : },
953 12 : &ctx,
954 12 : )
955 12 : .await?;
956 12 :
957 12 : // Full scan
958 12 : let expected: Vec<(Vec<u8>, u64)> = all_data
959 12 : .iter()
960 96 : .map(|(key, value)| (key.to_vec(), *value))
961 12 : .collect();
962 12 : let mut data = Vec::new();
963 12 : reader
964 12 : .visit(
965 12 : &[0u8; 6],
966 12 : VisitDirection::Forwards,
967 96 : |key, value| {
968 96 : data.push((key.to_vec(), value));
969 96 : true
970 96 : },
971 12 : &ctx,
972 12 : )
973 12 : .await?;
974 12 : assert_eq!(data, expected);
975 12 :
976 12 : Ok(())
977 12 : }
978 :
979 : #[tokio::test]
980 12 : async fn lots_of_keys() -> Result<()> {
981 12 : let mut disk = TestDisk::new();
982 12 : let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
983 12 : let ctx =
984 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
985 12 :
986 12 : const NUM_KEYS: u64 = 1000;
987 12 :
988 12 : let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
989 12 :
990 12012 : for idx in 0..NUM_KEYS {
991 12000 : let key_int: u64 = 1 + idx * 2;
992 12000 : let key = u64::to_be_bytes(key_int);
993 12000 : writer.append(&key, idx)?;
994 12 :
995 12000 : all_data.insert(key_int, idx);
996 12 : }
997 12 :
998 12 : let (root_offset, _writer) = writer.finish()?;
999 12 :
1000 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1001 12 :
1002 12 : reader.dump(&ctx).await?;
1003 12 :
1004 12 : use std::sync::Mutex;
1005 12 :
1006 12 : let result = Mutex::new(Vec::new());
1007 12 : let limit: AtomicUsize = AtomicUsize::new(10);
1008 502920 : let take_ten = |key: &[u8], value: u64| {
1009 502920 : let mut keybuf = [0u8; 8];
1010 502920 : keybuf.copy_from_slice(key);
1011 502920 : let key_int = u64::from_be_bytes(keybuf);
1012 502920 :
1013 502920 : let mut result = result.lock().unwrap();
1014 502920 : result.push((key_int, value));
1015 502920 :
1016 502920 : // keep going until we have 10 matches
1017 502920 : result.len() < limit.load(Ordering::Relaxed)
1018 502920 : };
1019 12 :
1020 24120 : for search_key_int in 0..(NUM_KEYS * 2 + 10) {
1021 24120 : let search_key = u64::to_be_bytes(search_key_int);
1022 24120 : assert_eq!(
1023 24120 : reader.get(&search_key, &ctx).await?,
1024 24120 : all_data.get(&search_key_int).cloned()
1025 12 : );
1026 12 :
1027 12 : // Test a forward scan starting with this key
1028 24120 : result.lock().unwrap().clear();
1029 24120 : reader
1030 24120 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1031 24120 : .await?;
1032 24120 : let expected = all_data
1033 24120 : .range(search_key_int..)
1034 24120 : .take(10)
1035 238920 : .map(|(&key, &val)| (key, val))
1036 24120 : .collect::<Vec<(u64, u64)>>();
1037 24120 : assert_eq!(*result.lock().unwrap(), expected);
1038 12 :
1039 12 : // And a backwards scan
1040 24120 : result.lock().unwrap().clear();
1041 24120 : reader
1042 24120 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1043 24120 : .await?;
1044 24120 : let expected = all_data
1045 24120 : .range(..=search_key_int)
1046 24120 : .rev()
1047 24120 : .take(10)
1048 240000 : .map(|(&key, &val)| (key, val))
1049 24120 : .collect::<Vec<(u64, u64)>>();
1050 24120 : assert_eq!(*result.lock().unwrap(), expected);
1051 12 : }
1052 12 :
1053 12 : // full scan
1054 12 : let search_key = u64::to_be_bytes(0);
1055 12 : limit.store(usize::MAX, Ordering::Relaxed);
1056 12 : result.lock().unwrap().clear();
1057 12 : reader
1058 12 : .visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
1059 12 : .await?;
1060 12 : let expected = all_data
1061 12 : .iter()
1062 12000 : .map(|(&key, &val)| (key, val))
1063 12 : .collect::<Vec<(u64, u64)>>();
1064 12 : assert_eq!(*result.lock().unwrap(), expected);
1065 12 :
1066 12 : // full scan
1067 12 : let search_key = u64::to_be_bytes(u64::MAX);
1068 12 : limit.store(usize::MAX, Ordering::Relaxed);
1069 12 : result.lock().unwrap().clear();
1070 12 : reader
1071 12 : .visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
1072 12 : .await?;
1073 12 : let expected = all_data
1074 12 : .iter()
1075 12 : .rev()
1076 12000 : .map(|(&key, &val)| (key, val))
1077 12 : .collect::<Vec<(u64, u64)>>();
1078 12 : assert_eq!(*result.lock().unwrap(), expected);
1079 12 :
1080 12 : Ok(())
1081 12 : }
1082 :
1083 : #[tokio::test]
1084 12 : async fn random_data() -> Result<()> {
1085 12 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1086 12 :
1087 12 : // Generate random keys with exponential distribution, to
1088 12 : // exercise the prefix compression
1089 12 : const NUM_KEYS: usize = 100000;
1090 12 : let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
1091 1200012 : for idx in 0..NUM_KEYS {
1092 1200000 : let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
1093 1200000 : let t = -(f64::ln(u));
1094 1200000 : let key_int = (t * 1000000.0) as u128;
1095 1200000 :
1096 1200000 : all_data.insert(key_int, idx as u64);
1097 1200000 : }
1098 12 :
1099 12 : // Build a tree from it
1100 12 : let mut disk = TestDisk::new();
1101 12 : let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
1102 12 :
1103 1170712 : for (&key, &val) in all_data.iter() {
1104 1170712 : writer.append(&u128::to_be_bytes(key), val)?;
1105 12 : }
1106 12 : let (root_offset, _writer) = writer.finish()?;
1107 12 :
1108 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1109 12 :
1110 12 : // Test get() operation on all the keys
1111 1170712 : for (&key, &val) in all_data.iter() {
1112 1170712 : let search_key = u128::to_be_bytes(key);
1113 1170712 : assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
1114 12 : }
1115 12 :
1116 12 : // Test get() operations on random keys, most of which will not exist
1117 1200012 : for _ in 0..100000 {
1118 1200000 : let key_int = rand::thread_rng().r#gen::<u128>();
1119 1200000 : let search_key = u128::to_be_bytes(key_int);
1120 1200000 : assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
1121 12 : }
1122 12 :
1123 12 : // Test boundary cases
1124 12 : assert!(
1125 12 : reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
1126 12 : == all_data.get(&u128::MIN).cloned()
1127 12 : );
1128 12 : assert!(
1129 12 : reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
1130 12 : == all_data.get(&u128::MAX).cloned()
1131 12 : );
1132 12 :
1133 12 : // Test iterator and get_stream API
1134 12 : let mut iter = reader.iter(&[0; 16], &ctx);
1135 12 : let mut cnt = 0;
1136 1170724 : while let Some(res) = iter.next().await {
1137 1170712 : let (key, val) = res?;
1138 1170712 : let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
1139 1170712 : assert_eq!(val, *all_data.get(&key).unwrap());
1140 1170712 : cnt += 1;
1141 12 : }
1142 12 : assert_eq!(cnt, all_data.len());
1143 12 :
1144 12 : Ok(())
1145 12 : }
1146 :
1147 : #[test]
1148 12 : fn unsorted_input() {
1149 12 : let mut disk = TestDisk::new();
1150 12 : let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
1151 12 :
1152 12 : let _ = writer.append(b"ba", 1);
1153 12 : let _ = writer.append(b"bb", 2);
1154 12 : let err = writer.append(b"aa", 3).expect_err("should've failed");
1155 12 : match err {
1156 12 : DiskBtreeError::UnsortedInput { key, last_key } => {
1157 12 : assert_eq!(key.as_ref(), b"aa".as_slice());
1158 12 : assert_eq!(last_key.as_ref(), b"bb".as_slice());
1159 : }
1160 0 : _ => panic!("unexpected error variant, expected DiskBtreeError::UnsortedInput"),
1161 : }
1162 12 : }
1163 :
1164 : ///
1165 : /// This test contains a particular data set, see disk_btree_test_data.rs
1166 : ///
1167 : #[tokio::test]
1168 12 : async fn particular_data() -> Result<()> {
1169 12 : // Build a tree from it
1170 12 : let mut disk = TestDisk::new();
1171 12 : let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
1172 12 : let ctx =
1173 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
1174 12 :
1175 24012 : for (key, val) in disk_btree_test_data::TEST_DATA {
1176 24000 : writer.append(&key, val)?;
1177 12 : }
1178 12 : let (root_offset, writer) = writer.finish()?;
1179 12 :
1180 12 : println!("SIZE: {} blocks", writer.blocks.len());
1181 12 :
1182 12 : let reader = DiskBtreeReader::new(0, root_offset, disk);
1183 12 :
1184 12 : // Test get() operation on all the keys
1185 24012 : for (key, val) in disk_btree_test_data::TEST_DATA {
1186 24000 : assert_eq!(reader.get(&key, &ctx).await?, Some(val));
1187 12 : }
1188 12 :
1189 12 : // Test full scan
1190 12 : let mut count = 0;
1191 12 : reader
1192 12 : .visit(
1193 12 : &[0u8; 26],
1194 12 : VisitDirection::Forwards,
1195 24000 : |_key, _value| {
1196 24000 : count += 1;
1197 24000 : true
1198 24000 : },
1199 12 : &ctx,
1200 12 : )
1201 12 : .await?;
1202 12 : assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
1203 12 :
1204 12 : reader.dump(&ctx).await?;
1205 12 :
1206 12 : Ok(())
1207 12 : }
1208 : }
1209 :
1210 : #[cfg(test)]
1211 : #[path = "disk_btree_test_data.rs"]
1212 : mod disk_btree_test_data;
|