Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::num::NonZeroUsize;
20 :
21 : use bytes::BytesMut;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::context::RequestContext;
29 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
30 : use crate::virtual_file::VirtualFile;
31 :
32 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
33 : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
34 :
35 : /// Metadata bundled with the start and end offset of a blob.
36 : #[derive(Copy, Clone, Debug)]
37 : pub struct BlobMeta {
38 : pub key: Key,
39 : pub lsn: Lsn,
40 : }
41 :
42 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
43 : pub struct VectoredBlob {
44 : pub start: usize,
45 : pub end: usize,
46 : pub meta: BlobMeta,
47 : }
48 :
49 : /// Return type of [`VectoredBlobReader::read_blobs`]
50 : pub struct VectoredBlobsBuf {
51 : /// Buffer for all blobs in this read
52 : pub buf: BytesMut,
53 : /// Offsets into the buffer and metadata for all blobs in this read
54 : pub blobs: Vec<VectoredBlob>,
55 : }
56 :
57 : /// Description of one disk read for multiple blobs.
58 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
59 : #[derive(Debug)]
60 : pub struct VectoredRead {
61 : pub start: u64,
62 : pub end: u64,
63 : /// Starting offsets and metadata for each blob in this read
64 : pub blobs_at: VecMap<u64, BlobMeta>,
65 : }
66 :
67 : impl VectoredRead {
68 966831 : pub(crate) fn size(&self) -> usize {
69 966831 : (self.end - self.start) as usize
70 966831 : }
71 : }
72 :
73 : #[derive(Eq, PartialEq, Debug)]
74 : pub(crate) enum VectoredReadExtended {
75 : Yes,
76 : No,
77 : }
78 :
79 : pub(crate) struct VectoredReadBuilder {
80 : start: u64,
81 : end: u64,
82 : blobs_at: VecMap<u64, BlobMeta>,
83 : max_read_size: Option<usize>,
84 : }
85 :
86 : impl VectoredReadBuilder {
87 : /// Start building a new vectored read.
88 : ///
89 : /// Note that by design, this does not check against reading more than `max_read_size` to
90 : /// support reading larger blobs than the configuration value. The builder will be single use
91 : /// however after that.
92 172056 : pub(crate) fn new(
93 172056 : start_offset: u64,
94 172056 : end_offset: u64,
95 172056 : meta: BlobMeta,
96 172056 : max_read_size: usize,
97 172056 : ) -> Self {
98 172056 : let mut blobs_at = VecMap::default();
99 172056 : blobs_at
100 172056 : .append(start_offset, meta)
101 172056 : .expect("First insertion always succeeds");
102 172056 :
103 172056 : Self {
104 172056 : start: start_offset,
105 172056 : end: end_offset,
106 172056 : blobs_at,
107 172056 : max_read_size: Some(max_read_size),
108 172056 : }
109 172056 : }
110 : /// Attempt to extend the current read with a new blob if the start
111 : /// offset matches with the current end of the vectored read
112 : /// and the resuting size is below the max read size
113 2444560 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
114 2444560 : tracing::trace!(start, end, "trying to extend");
115 2444560 : let size = (end - start) as usize;
116 2444560 : if self.end == start && {
117 2425641 : if let Some(max_read_size) = self.max_read_size {
118 338773 : self.size() + size <= max_read_size
119 : } else {
120 2086868 : true
121 : }
122 : } {
123 2406275 : self.end = end;
124 2406275 : self.blobs_at
125 2406275 : .append(start, meta)
126 2406275 : .expect("LSNs are ordered within vectored reads");
127 2406275 :
128 2406275 : return VectoredReadExtended::Yes;
129 38285 : }
130 38285 :
131 38285 : VectoredReadExtended::No
132 2444560 : }
133 :
134 2465289 : pub(crate) fn size(&self) -> usize {
135 2465289 : (self.end - self.start) as usize
136 2465289 : }
137 :
138 212270 : pub(crate) fn build(self) -> VectoredRead {
139 212270 : VectoredRead {
140 212270 : start: self.start,
141 212270 : end: self.end,
142 212270 : blobs_at: self.blobs_at,
143 212270 : }
144 212270 : }
145 : }
146 :
147 : #[derive(Copy, Clone, Debug)]
148 : pub enum BlobFlag {
149 : None,
150 : Ignore,
151 : ReplaceAll,
152 : }
153 :
154 : /// Planner for vectored blob reads.
155 : ///
156 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
157 : /// and coalesced into disk reads.
158 : ///
159 : /// The implementation is very simple:
160 : /// * Collect all blob offsets in an ordered structure
161 : /// * Iterate over the collected blobs and coalesce them into reads at the end
162 : pub struct VectoredReadPlanner {
163 : // Track all the blob offsets. Start offsets must be ordered.
164 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
165 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
166 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
167 :
168 : max_read_size: usize,
169 : }
170 :
171 : impl VectoredReadPlanner {
172 212119 : pub fn new(max_read_size: usize) -> Self {
173 212119 : Self {
174 212119 : blobs: BTreeMap::new(),
175 212119 : prev: None,
176 212119 : max_read_size,
177 212119 : }
178 212119 : }
179 :
180 : /// Include a new blob in the read plan.
181 : ///
182 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
183 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
184 : /// keys in a given keyspace. This function must be called for each key in the desired
185 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
186 : /// be called after every range in the offset.
187 : ///
188 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
189 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
190 : /// incorrect data to the user.
191 : ///
192 : /// The `flag` argument has two interesting values:
193 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
194 : /// This is used for WAL records that `will_init`.
195 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
196 : /// if the blob is cached.
197 1411758 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
198 : // Implementation note: internally lag behind by one blob such that
199 : // we have a start and end offset when initialising [`VectoredRead`]
200 1411758 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
201 : None => {
202 136502 : self.prev = Some((key, lsn, offset, flag));
203 136502 : return;
204 : }
205 1275256 : Some(prev) => prev,
206 1275256 : };
207 1275256 :
208 1275256 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
209 1275256 :
210 1275256 : self.prev = Some((key, lsn, offset, flag));
211 1411758 : }
212 :
213 276129 : pub fn handle_range_end(&mut self, offset: u64) {
214 276129 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
215 136502 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
216 139627 : }
217 :
218 276129 : self.prev = None;
219 276129 : }
220 :
221 1411758 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
222 1411758 : match flag {
223 337709 : BlobFlag::None => {
224 337709 : let blobs_for_key = self.blobs.entry(key).or_default();
225 337709 : blobs_for_key.push((lsn, start_offset, end_offset));
226 337709 : }
227 256520 : BlobFlag::ReplaceAll => {
228 256520 : let blobs_for_key = self.blobs.entry(key).or_default();
229 256520 : blobs_for_key.clear();
230 256520 : blobs_for_key.push((lsn, start_offset, end_offset));
231 256520 : }
232 817529 : BlobFlag::Ignore => {}
233 : }
234 1411758 : }
235 :
236 212119 : pub fn finish(self) -> Vec<VectoredRead> {
237 212119 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
238 212119 : let mut reads = Vec::new();
239 :
240 648340 : for (key, blobs_for_key) in self.blobs {
241 923540 : for (lsn, start_offset, end_offset) in blobs_for_key {
242 487319 : let extended = match &mut current_read_builder {
243 357670 : Some(read_builder) => {
244 357670 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
245 : }
246 129649 : None => VectoredReadExtended::No,
247 : };
248 :
249 487319 : if extended == VectoredReadExtended::No {
250 167928 : let next_read_builder = VectoredReadBuilder::new(
251 167928 : start_offset,
252 167928 : end_offset,
253 167928 : BlobMeta { key, lsn },
254 167928 : self.max_read_size,
255 167928 : );
256 167928 :
257 167928 : let prev_read_builder = current_read_builder.replace(next_read_builder);
258 :
259 : // `current_read_builder` is None in the first iteration of the outer loop
260 167928 : if let Some(read_builder) = prev_read_builder {
261 38279 : reads.push(read_builder.build());
262 129649 : }
263 319391 : }
264 : }
265 : }
266 :
267 212119 : if let Some(read_builder) = current_read_builder {
268 129649 : reads.push(read_builder.build());
269 129649 : }
270 :
271 212119 : reads
272 212119 : }
273 : }
274 :
275 : /// Disk reader for vectored blob spans (does not go through the page cache)
276 : pub struct VectoredBlobReader<'a> {
277 : file: &'a VirtualFile,
278 : }
279 :
280 : impl<'a> VectoredBlobReader<'a> {
281 252325 : pub fn new(file: &'a VirtualFile) -> Self {
282 252325 : Self { file }
283 252325 : }
284 :
285 : /// Read the requested blobs into the buffer.
286 : ///
287 : /// We have to deal with the fact that blobs are not fixed size.
288 : /// Each blob is prefixed by a size header.
289 : ///
290 : /// The success return value is a struct which contains the buffer
291 : /// filled from disk and a list of offsets at which each blob lies
292 : /// in the buffer.
293 212222 : pub async fn read_blobs(
294 212222 : &self,
295 212222 : read: &VectoredRead,
296 212222 : buf: BytesMut,
297 212222 : ctx: &RequestContext,
298 212222 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
299 212222 : assert!(read.size() > 0);
300 212222 : assert!(
301 212222 : read.size() <= buf.capacity(),
302 0 : "{} > {}",
303 0 : read.size(),
304 0 : buf.capacity()
305 : );
306 212222 : let mut buf = self
307 212222 : .file
308 212222 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
309 107886 : .await?
310 212222 : .into_inner();
311 212222 :
312 212222 : let blobs_at = read.blobs_at.as_slice();
313 212222 : let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
314 212222 :
315 212222 : let mut metas = Vec::with_capacity(blobs_at.len());
316 212222 :
317 212222 : // Blobs in `read` only provide their starting offset. The end offset
318 212222 : // of a blob is implicit: the start of the next blob if one exists
319 212222 : // or the end of the read.
320 212222 : let pairs = blobs_at.iter().zip(
321 212222 : blobs_at
322 212222 : .iter()
323 212222 : .map(Some)
324 212222 : .skip(1)
325 212222 : .chain(std::iter::once(None)),
326 212222 : );
327 212222 :
328 212222 : // Some scratch space, put here for reusing the allocation
329 212222 : let mut decompressed_vec = Vec::new();
330 :
331 2830673 : for ((offset, meta), next) in pairs {
332 2618451 : let offset_in_buf = offset - start_offset;
333 2618451 : let first_len_byte = buf[offset_in_buf as usize];
334 :
335 : // Each blob is prefixed by a header containing its size and compression information.
336 : // Extract the size and skip that header to find the start of the data.
337 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
338 : // 1 byte case and 1 in the 4 byte case.
339 2618451 : let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
340 2580759 : (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
341 : } else {
342 37692 : let mut blob_size_buf = [0u8; 4];
343 37692 : let offset_in_buf = offset_in_buf as usize;
344 37692 :
345 37692 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
346 37692 : blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
347 37692 :
348 37692 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
349 37692 : (
350 37692 : 4,
351 37692 : u32::from_be_bytes(blob_size_buf) as u64,
352 37692 : compression_bits,
353 37692 : )
354 : };
355 :
356 2618451 : let start_raw = offset_in_buf + size_length;
357 2618451 : let end_raw = match next {
358 2406229 : Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
359 212222 : None => start_raw + blob_size,
360 : };
361 2618451 : assert_eq!(end_raw - start_raw, blob_size);
362 : let (start, end);
363 2618451 : if compression_bits == BYTE_UNCOMPRESSED {
364 2618449 : start = start_raw as usize;
365 2618449 : end = end_raw as usize;
366 2618449 : } else if compression_bits == BYTE_ZSTD {
367 2 : let mut decoder =
368 2 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
369 2 : decoder
370 2 : .write_all(&buf[start_raw as usize..end_raw as usize])
371 0 : .await?;
372 2 : decoder.flush().await?;
373 2 : start = buf.len();
374 2 : buf.extend_from_slice(&decompressed_vec);
375 2 : end = buf.len();
376 2 : decompressed_vec.clear();
377 : } else {
378 0 : let error = std::io::Error::new(
379 0 : std::io::ErrorKind::InvalidData,
380 0 : format!("invalid compression byte {compression_bits:x}"),
381 0 : );
382 0 : return Err(error);
383 : }
384 :
385 2618451 : metas.push(VectoredBlob {
386 2618451 : start,
387 2618451 : end,
388 2618451 : meta: *meta,
389 2618451 : });
390 : }
391 :
392 212222 : Ok(VectoredBlobsBuf { buf, blobs: metas })
393 212222 : }
394 : }
395 :
396 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
397 : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
398 : /// max_cnt constraints.
399 : pub struct StreamingVectoredReadPlanner {
400 : read_builder: Option<VectoredReadBuilder>,
401 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
402 : prev: Option<(Key, Lsn, u64)>,
403 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
404 : /// we will produce a single batch instead of split them.
405 : max_read_size: u64,
406 : /// Max item count per batch
407 : max_cnt: usize,
408 : /// Size of the current batch
409 : cnt: usize,
410 : }
411 :
412 : impl StreamingVectoredReadPlanner {
413 736 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
414 736 : assert!(max_cnt > 0);
415 736 : assert!(max_read_size > 0);
416 736 : Self {
417 736 : read_builder: None,
418 736 : prev: None,
419 736 : max_cnt,
420 736 : max_read_size,
421 736 : cnt: 0,
422 736 : }
423 736 : }
424 :
425 2127138 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
426 : // Implementation note: internally lag behind by one blob such that
427 : // we have a start and end offset when initialising [`VectoredRead`]
428 2127138 : let (prev_key, prev_lsn, prev_offset) = match self.prev {
429 : None => {
430 622 : self.prev = Some((key, lsn, offset));
431 622 : return None;
432 : }
433 2126516 : Some(prev) => prev,
434 2126516 : };
435 2126516 :
436 2126516 : let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
437 2126516 :
438 2126516 : self.prev = Some((key, lsn, offset));
439 2126516 :
440 2126516 : res
441 2127138 : }
442 :
443 568 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
444 568 : let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
445 566 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
446 : } else {
447 2 : None
448 : };
449 :
450 568 : self.prev = None;
451 568 :
452 568 : res
453 568 : }
454 :
455 2127082 : fn add_blob(
456 2127082 : &mut self,
457 2127082 : key: Key,
458 2127082 : lsn: Lsn,
459 2127082 : start_offset: u64,
460 2127082 : end_offset: u64,
461 2127082 : is_last_blob_in_read: bool,
462 2127082 : ) -> Option<VectoredRead> {
463 2127082 : match &mut self.read_builder {
464 2086868 : Some(read_builder) => {
465 2086868 : let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
466 2086868 : assert_eq!(extended, VectoredReadExtended::Yes);
467 : }
468 40214 : None => {
469 40214 : self.read_builder = {
470 40214 : let mut blobs_at = VecMap::default();
471 40214 : blobs_at
472 40214 : .append(start_offset, BlobMeta { key, lsn })
473 40214 : .expect("First insertion always succeeds");
474 40214 :
475 40214 : Some(VectoredReadBuilder {
476 40214 : start: start_offset,
477 40214 : end: end_offset,
478 40214 : blobs_at,
479 40214 : max_read_size: None,
480 40214 : })
481 40214 : };
482 40214 : }
483 : }
484 2127082 : let read_builder = self.read_builder.as_mut().unwrap();
485 2127082 : self.cnt += 1;
486 2127082 : if is_last_blob_in_read
487 2126516 : || read_builder.size() >= self.max_read_size as usize
488 2098452 : || self.cnt >= self.max_cnt
489 : {
490 40214 : let prev_read_builder = self.read_builder.take();
491 40214 : self.cnt = 0;
492 :
493 : // `current_read_builder` is None in the first iteration
494 40214 : if let Some(read_builder) = prev_read_builder {
495 40214 : return Some(read_builder.build());
496 0 : }
497 2086868 : }
498 2086868 : None
499 2127082 : }
500 : }
501 :
502 : #[cfg(test)]
503 : mod tests {
504 : use anyhow::Error;
505 :
506 : use crate::context::DownloadBehavior;
507 : use crate::page_cache::PAGE_SZ;
508 : use crate::task_mgr::TaskKind;
509 :
510 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
511 : use super::*;
512 :
513 42 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
514 42 : assert_eq!(read.start, offset_range.first().unwrap().2);
515 :
516 66 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
517 42 :
518 42 : let offsets_in_read: Vec<_> = read
519 42 : .blobs_at
520 42 : .as_slice()
521 42 : .iter()
522 66 : .map(|(offset, _)| *offset)
523 42 : .collect();
524 42 :
525 42 : assert_eq!(expected_offsets_in_read, offsets_in_read);
526 42 : }
527 :
528 : #[test]
529 2 : fn planner_max_read_size_test() {
530 2 : let max_read_size = 128 * 1024;
531 2 : let key = Key::MIN;
532 2 : let lsn = Lsn(0);
533 2 :
534 2 : let blob_descriptions = vec![
535 2 : (key, lsn, 0, BlobFlag::None),
536 2 : (key, lsn, 32 * 1024, BlobFlag::None),
537 2 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
538 2 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
539 2 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
540 2 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
541 2 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
542 2 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
543 2 : ];
544 2 :
545 2 : let ranges = [
546 2 : &blob_descriptions[0..3],
547 2 : &blob_descriptions[3..4],
548 2 : &blob_descriptions[4..5],
549 2 : &blob_descriptions[5..6],
550 2 : &blob_descriptions[6..7],
551 2 : &blob_descriptions[7..],
552 2 : ];
553 2 :
554 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
555 16 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
556 16 : planner.handle(key, lsn, offset, flag);
557 16 : }
558 :
559 2 : planner.handle_range_end(652 * 1024);
560 2 :
561 2 : let reads = planner.finish();
562 2 :
563 2 : assert_eq!(reads.len(), 6);
564 :
565 : // TODO: could remove zero reads to produce 5 reads here
566 :
567 12 : for (idx, read) in reads.iter().enumerate() {
568 12 : validate_read(read, ranges[idx]);
569 12 : }
570 2 : }
571 :
572 : #[test]
573 2 : fn planner_replacement_test() {
574 2 : let max_read_size = 128 * 1024;
575 2 : let first_key = Key::MIN;
576 2 : let second_key = first_key.next();
577 2 : let lsn = Lsn(0);
578 2 :
579 2 : let blob_descriptions = vec![
580 2 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
581 2 : (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
582 2 : (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
583 2 : (second_key, lsn, 3 * 1024, BlobFlag::None),
584 2 : (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
585 2 : (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
586 2 : ];
587 2 :
588 2 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
589 2 :
590 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
591 12 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
592 12 : planner.handle(key, lsn, offset, flag);
593 12 : }
594 :
595 2 : planner.handle_range_end(6 * 1024);
596 2 :
597 2 : let reads = planner.finish();
598 2 : assert_eq!(reads.len(), 2);
599 :
600 4 : for (idx, read) in reads.iter().enumerate() {
601 4 : validate_read(read, ranges[idx]);
602 4 : }
603 2 : }
604 :
605 : #[test]
606 2 : fn streaming_planner_max_read_size_test() {
607 2 : let max_read_size = 128 * 1024;
608 2 : let key = Key::MIN;
609 2 : let lsn = Lsn(0);
610 2 :
611 2 : let blob_descriptions = vec![
612 2 : (key, lsn, 0, BlobFlag::None),
613 2 : (key, lsn, 32 * 1024, BlobFlag::None),
614 2 : (key, lsn, 96 * 1024, BlobFlag::None),
615 2 : (key, lsn, 128 * 1024, BlobFlag::None),
616 2 : (key, lsn, 198 * 1024, BlobFlag::None),
617 2 : (key, lsn, 268 * 1024, BlobFlag::None),
618 2 : (key, lsn, 396 * 1024, BlobFlag::None),
619 2 : (key, lsn, 652 * 1024, BlobFlag::None),
620 2 : ];
621 2 :
622 2 : let ranges = [
623 2 : &blob_descriptions[0..3],
624 2 : &blob_descriptions[3..5],
625 2 : &blob_descriptions[5..6],
626 2 : &blob_descriptions[6..7],
627 2 : &blob_descriptions[7..],
628 2 : ];
629 2 :
630 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
631 2 : let mut reads = Vec::new();
632 16 : for (key, lsn, offset, _) in blob_descriptions.clone() {
633 16 : reads.extend(planner.handle(key, lsn, offset));
634 16 : }
635 2 : reads.extend(planner.handle_range_end(652 * 1024));
636 2 :
637 2 : assert_eq!(reads.len(), ranges.len());
638 :
639 10 : for (idx, read) in reads.iter().enumerate() {
640 10 : validate_read(read, ranges[idx]);
641 10 : }
642 2 : }
643 :
644 : #[test]
645 2 : fn streaming_planner_max_cnt_test() {
646 2 : let max_read_size = 1024 * 1024;
647 2 : let key = Key::MIN;
648 2 : let lsn = Lsn(0);
649 2 :
650 2 : let blob_descriptions = vec![
651 2 : (key, lsn, 0, BlobFlag::None),
652 2 : (key, lsn, 32 * 1024, BlobFlag::None),
653 2 : (key, lsn, 96 * 1024, BlobFlag::None),
654 2 : (key, lsn, 128 * 1024, BlobFlag::None),
655 2 : (key, lsn, 198 * 1024, BlobFlag::None),
656 2 : (key, lsn, 268 * 1024, BlobFlag::None),
657 2 : (key, lsn, 396 * 1024, BlobFlag::None),
658 2 : (key, lsn, 652 * 1024, BlobFlag::None),
659 2 : ];
660 2 :
661 2 : let ranges = [
662 2 : &blob_descriptions[0..2],
663 2 : &blob_descriptions[2..4],
664 2 : &blob_descriptions[4..6],
665 2 : &blob_descriptions[6..],
666 2 : ];
667 2 :
668 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
669 2 : let mut reads = Vec::new();
670 16 : for (key, lsn, offset, _) in blob_descriptions.clone() {
671 16 : reads.extend(planner.handle(key, lsn, offset));
672 16 : }
673 2 : reads.extend(planner.handle_range_end(652 * 1024));
674 2 :
675 2 : assert_eq!(reads.len(), ranges.len());
676 :
677 8 : for (idx, read) in reads.iter().enumerate() {
678 8 : validate_read(read, ranges[idx]);
679 8 : }
680 2 : }
681 :
682 : #[test]
683 2 : fn streaming_planner_edge_test() {
684 2 : let max_read_size = 1024 * 1024;
685 2 : let key = Key::MIN;
686 2 : let lsn = Lsn(0);
687 2 : {
688 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
689 2 : let mut reads = Vec::new();
690 2 : reads.extend(planner.handle_range_end(652 * 1024));
691 2 : assert!(reads.is_empty());
692 : }
693 : {
694 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
695 2 : let mut reads = Vec::new();
696 2 : reads.extend(planner.handle(key, lsn, 0));
697 2 : reads.extend(planner.handle_range_end(652 * 1024));
698 2 : assert_eq!(reads.len(), 1);
699 2 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
700 2 : }
701 2 : {
702 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
703 2 : let mut reads = Vec::new();
704 2 : reads.extend(planner.handle(key, lsn, 0));
705 2 : reads.extend(planner.handle(key, lsn, 128 * 1024));
706 2 : reads.extend(planner.handle_range_end(652 * 1024));
707 2 : assert_eq!(reads.len(), 2);
708 2 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
709 2 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
710 2 : }
711 2 : {
712 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
713 2 : let mut reads = Vec::new();
714 2 : reads.extend(planner.handle(key, lsn, 0));
715 2 : reads.extend(planner.handle(key, lsn, 128 * 1024));
716 2 : reads.extend(planner.handle_range_end(652 * 1024));
717 2 : assert_eq!(reads.len(), 1);
718 2 : validate_read(
719 2 : &reads[0],
720 2 : &[
721 2 : (key, lsn, 0, BlobFlag::None),
722 2 : (key, lsn, 128 * 1024, BlobFlag::None),
723 2 : ],
724 2 : );
725 2 : }
726 2 : }
727 :
728 8 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
729 8 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
730 8 : let (_temp_dir, pathbuf, offsets) =
731 279 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
732 :
733 8 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
734 8 : let file_len = std::fs::metadata(&pathbuf)?.len();
735 8 :
736 8 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
737 4120 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
738 8 : let mut buf = BytesMut::with_capacity(reserved_bytes);
739 8 :
740 8 : let vectored_blob_reader = VectoredBlobReader::new(&file);
741 8 : let meta = BlobMeta {
742 8 : key: Key::MIN,
743 8 : lsn: Lsn(0),
744 8 : };
745 :
746 4120 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
747 4120 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
748 4120 : if idx + 1 == offsets.len() {
749 8 : continue;
750 4112 : }
751 4112 : let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
752 4112 : let read = read_builder.build();
753 4112 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
754 4112 : assert_eq!(result.blobs.len(), 1);
755 4112 : let read_blob = &result.blobs[0];
756 4112 : let read_buf = &result.buf[read_blob.start..read_blob.end];
757 4112 : assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
758 4112 : buf = result.buf;
759 : }
760 8 : Ok(())
761 8 : }
762 :
763 : #[tokio::test]
764 2 : async fn test_really_big_array() -> Result<(), Error> {
765 2 : let blobs = &[
766 2 : b"test".to_vec(),
767 2 : random_array(10 * PAGE_SZ),
768 2 : b"hello".to_vec(),
769 2 : random_array(66 * PAGE_SZ),
770 2 : vec![0xf3; 24 * PAGE_SZ],
771 2 : b"foobar".to_vec(),
772 2 : ];
773 14 : round_trip_test_compressed(blobs, false).await?;
774 11 : round_trip_test_compressed(blobs, true).await?;
775 2 : Ok(())
776 2 : }
777 :
778 : #[tokio::test]
779 2 : async fn test_arrays_inc() -> Result<(), Error> {
780 2 : let blobs = (0..PAGE_SZ / 8)
781 2048 : .map(|v| random_array(v * 16))
782 2 : .collect::<Vec<_>>();
783 1172 : round_trip_test_compressed(&blobs, false).await?;
784 1172 : round_trip_test_compressed(&blobs, true).await?;
785 2 : Ok(())
786 2 : }
787 : }
|