LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: a43a77853355b937a79c57b07a8f05607cf29e6c.info Lines: 98.3 % 697 685
Test Date: 2024-09-19 12:04:32 Functions: 100.0 % 47 47

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : 
      20              : use bytes::BytesMut;
      21              : use pageserver_api::key::Key;
      22              : use tokio::io::AsyncWriteExt;
      23              : use tokio_epoll_uring::BoundedBuf;
      24              : use utils::lsn::Lsn;
      25              : use utils::vec_map::VecMap;
      26              : 
      27              : use crate::context::RequestContext;
      28              : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
      29              : use crate::virtual_file::{self, VirtualFile};
      30              : 
      31              : /// Metadata bundled with the start and end offset of a blob.
      32              : #[derive(Copy, Clone, Debug)]
      33              : pub struct BlobMeta {
      34              :     pub key: Key,
      35              :     pub lsn: Lsn,
      36              : }
      37              : 
      38              : /// Blob offsets into [`VectoredBlobsBuf::buf`]
      39              : pub struct VectoredBlob {
      40              :     pub start: usize,
      41              :     pub end: usize,
      42              :     pub meta: BlobMeta,
      43              : }
      44              : 
      45              : /// Return type of [`VectoredBlobReader::read_blobs`]
      46              : pub struct VectoredBlobsBuf {
      47              :     /// Buffer for all blobs in this read
      48              :     pub buf: BytesMut,
      49              :     /// Offsets into the buffer and metadata for all blobs in this read
      50              :     pub blobs: Vec<VectoredBlob>,
      51              : }
      52              : 
      53              : /// Description of one disk read for multiple blobs.
      54              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
      55              : #[derive(Debug)]
      56              : pub struct VectoredRead {
      57              :     pub start: u64,
      58              :     pub end: u64,
      59              :     /// Start offset and metadata for each blob in this read
      60              :     pub blobs_at: VecMap<u64, BlobMeta>,
      61              : }
      62              : 
      63              : impl VectoredRead {
      64      2852696 :     pub(crate) fn size(&self) -> usize {
      65      2852696 :         (self.end - self.start) as usize
      66      2852696 :     }
      67              : }
      68              : 
      69              : #[derive(Eq, PartialEq, Debug)]
      70              : pub(crate) enum VectoredReadExtended {
      71              :     Yes,
      72              :     No,
      73              : }
      74              : 
      75              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      76              : pub enum VectoredReadCoalesceMode {
      77              :     /// Only coalesce exactly adjacent reads.
      78              :     AdjacentOnly,
      79              :     /// In addition to adjacent reads, also consider reads whose corresponding
      80              :     /// `end` and `start` offsets reside at the same chunk.
      81              :     Chunked(usize),
      82              : }
      83              : 
      84              : impl VectoredReadCoalesceMode {
      85              :     /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0,
      86              :     /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher.
      87       642594 :     pub(crate) fn get() -> Self {
      88       642594 :         let align = virtual_file::get_io_buffer_alignment_raw();
      89       642594 :         if align == 0 {
      90       214116 :             VectoredReadCoalesceMode::AdjacentOnly
      91              :         } else {
      92       428478 :             VectoredReadCoalesceMode::Chunked(align)
      93              :         }
      94       642594 :     }
      95              : }
      96              : 
      97              : pub(crate) enum VectoredReadBuilder {
      98              :     Adjacent(AdjacentVectoredReadBuilder),
      99              :     Chunked(ChunkedVectoredReadBuilder),
     100              : }
     101              : 
     102              : impl VectoredReadBuilder {
     103       624645 :     fn new_impl(
     104       624645 :         start_offset: u64,
     105       624645 :         end_offset: u64,
     106       624645 :         meta: BlobMeta,
     107       624645 :         max_read_size: Option<usize>,
     108       624645 :         mode: VectoredReadCoalesceMode,
     109       624645 :     ) -> Self {
     110       624645 :         match mode {
     111       213047 :             VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent(
     112       213047 :                 AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size),
     113       213047 :             ),
     114       411598 :             VectoredReadCoalesceMode::Chunked(chunk_size) => {
     115       411598 :                 Self::Chunked(ChunkedVectoredReadBuilder::new(
     116       411598 :                     start_offset,
     117       411598 :                     end_offset,
     118       411598 :                     meta,
     119       411598 :                     max_read_size,
     120       411598 :                     chunk_size,
     121       411598 :                 ))
     122              :             }
     123              :         }
     124       624645 :     }
     125              : 
     126       503715 :     pub(crate) fn new(
     127       503715 :         start_offset: u64,
     128       503715 :         end_offset: u64,
     129       503715 :         meta: BlobMeta,
     130       503715 :         max_read_size: usize,
     131       503715 :         mode: VectoredReadCoalesceMode,
     132       503715 :     ) -> Self {
     133       503715 :         Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode)
     134       503715 :     }
     135              : 
     136       120930 :     pub(crate) fn new_streaming(
     137       120930 :         start_offset: u64,
     138       120930 :         end_offset: u64,
     139       120930 :         meta: BlobMeta,
     140       120930 :         mode: VectoredReadCoalesceMode,
     141       120930 :     ) -> Self {
     142       120930 :         Self::new_impl(start_offset, end_offset, meta, None, mode)
     143       120930 :     }
     144              : 
     145      7333970 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     146      7333970 :         match self {
     147      2444738 :             VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta),
     148      4889232 :             VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta),
     149              :         }
     150      7333970 :     }
     151              : 
     152       624645 :     pub(crate) fn build(self) -> VectoredRead {
     153       624645 :         match self {
     154       213047 :             VectoredReadBuilder::Adjacent(builder) => builder.build(),
     155       411598 :             VectoredReadBuilder::Chunked(builder) => builder.build(),
     156              :         }
     157       624645 :     }
     158              : 
     159      6380026 :     pub(crate) fn size(&self) -> usize {
     160      6380026 :         match self {
     161      2126676 :             VectoredReadBuilder::Adjacent(builder) => builder.size(),
     162      4253350 :             VectoredReadBuilder::Chunked(builder) => builder.size(),
     163              :         }
     164      6380026 :     }
     165              : }
     166              : 
     167              : pub(crate) struct AdjacentVectoredReadBuilder {
     168              :     /// Start offset of the read.
     169              :     start: u64,
     170              :     // End offset of the read.
     171              :     end: u64,
     172              :     /// Start offset and metadata for each blob in this read
     173              :     blobs_at: VecMap<u64, BlobMeta>,
     174              :     max_read_size: Option<usize>,
     175              : }
     176              : 
     177              : impl AdjacentVectoredReadBuilder {
     178              :     /// Start building a new vectored read.
     179              :     ///
     180              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     181              :     /// support reading larger blobs than the configuration value. The builder will be single use
     182              :     /// however after that.
     183       213047 :     pub(crate) fn new(
     184       213047 :         start_offset: u64,
     185       213047 :         end_offset: u64,
     186       213047 :         meta: BlobMeta,
     187       213047 :         max_read_size: Option<usize>,
     188       213047 :     ) -> Self {
     189       213047 :         let mut blobs_at = VecMap::default();
     190       213047 :         blobs_at
     191       213047 :             .append(start_offset, meta)
     192       213047 :             .expect("First insertion always succeeds");
     193       213047 : 
     194       213047 :         Self {
     195       213047 :             start: start_offset,
     196       213047 :             end: end_offset,
     197       213047 :             blobs_at,
     198       213047 :             max_read_size,
     199       213047 :         }
     200       213047 :     }
     201              :     /// Attempt to extend the current read with a new blob if the start
     202              :     /// offset matches with the current end of the vectored read
     203              :     /// and the resuting size is below the max read size
     204      2444738 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     205      2444738 :         tracing::trace!(start, end, "trying to extend");
     206      2444738 :         let size = (end - start) as usize;
     207      2444738 :         let not_limited_by_max_read_size = {
     208      2444738 :             if let Some(max_read_size) = self.max_read_size {
     209       357710 :                 self.size() + size <= max_read_size
     210              :             } else {
     211      2087028 :                 true
     212              :             }
     213              :         };
     214              : 
     215      2444738 :         if self.end == start && not_limited_by_max_read_size {
     216      2406418 :             self.end = end;
     217      2406418 :             self.blobs_at
     218      2406418 :                 .append(start, meta)
     219      2406418 :                 .expect("LSNs are ordered within vectored reads");
     220      2406418 : 
     221      2406418 :             return VectoredReadExtended::Yes;
     222        38320 :         }
     223        38320 : 
     224        38320 :         VectoredReadExtended::No
     225      2444738 :     }
     226              : 
     227      2484386 :     pub(crate) fn size(&self) -> usize {
     228      2484386 :         (self.end - self.start) as usize
     229      2484386 :     }
     230              : 
     231       213047 :     pub(crate) fn build(self) -> VectoredRead {
     232       213047 :         VectoredRead {
     233       213047 :             start: self.start,
     234       213047 :             end: self.end,
     235       213047 :             blobs_at: self.blobs_at,
     236       213047 :         }
     237       213047 :     }
     238              : }
     239              : 
     240              : pub(crate) struct ChunkedVectoredReadBuilder {
     241              :     /// Start block number
     242              :     start_blk_no: usize,
     243              :     /// End block number (exclusive).
     244              :     end_blk_no: usize,
     245              :     /// Start offset and metadata for each blob in this read
     246              :     blobs_at: VecMap<u64, BlobMeta>,
     247              :     max_read_size: Option<usize>,
     248              :     /// Chunk size reads are coalesced into.
     249              :     chunk_size: usize,
     250              : }
     251              : 
     252              : /// Computes x / d rounded up.
     253      5300848 : fn div_round_up(x: usize, d: usize) -> usize {
     254      5300848 :     (x + (d - 1)) / d
     255      5300848 : }
     256              : 
     257              : impl ChunkedVectoredReadBuilder {
     258              :     /// Start building a new vectored read.
     259              :     ///
     260              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     261              :     /// support reading larger blobs than the configuration value. The builder will be single use
     262              :     /// however after that.
     263       411598 :     pub(crate) fn new(
     264       411598 :         start_offset: u64,
     265       411598 :         end_offset: u64,
     266       411598 :         meta: BlobMeta,
     267       411598 :         max_read_size: Option<usize>,
     268       411598 :         chunk_size: usize,
     269       411598 :     ) -> Self {
     270       411598 :         let mut blobs_at = VecMap::default();
     271       411598 :         blobs_at
     272       411598 :             .append(start_offset, meta)
     273       411598 :             .expect("First insertion always succeeds");
     274       411598 : 
     275       411598 :         let start_blk_no = start_offset as usize / chunk_size;
     276       411598 :         let end_blk_no = div_round_up(end_offset as usize, chunk_size);
     277       411598 :         Self {
     278       411598 :             start_blk_no,
     279       411598 :             end_blk_no,
     280       411598 :             blobs_at,
     281       411598 :             max_read_size,
     282       411598 :             chunk_size,
     283       411598 :         }
     284       411598 :     }
     285              : 
     286              :     /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
     287              :     ///
     288              :     /// The resulting size also must be below the max read size.
     289      4889232 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     290      4889232 :         tracing::trace!(start, end, "trying to extend");
     291      4889232 :         let start_blk_no = start as usize / self.chunk_size;
     292      4889232 :         let end_blk_no = div_round_up(end as usize, self.chunk_size);
     293              : 
     294      4889232 :         let not_limited_by_max_read_size = {
     295      4889232 :             if let Some(max_read_size) = self.max_read_size {
     296       715436 :                 let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
     297       715436 :                 coalesced_size <= max_read_size
     298              :             } else {
     299      4173796 :                 true
     300              :             }
     301              :         };
     302              : 
     303              :         // True if the second block starts in the same block or the immediate next block where the first block ended.
     304              :         //
     305              :         // Note: This automatically handles the case where two blocks are adjacent to each other,
     306              :         // whether they starts on chunk size boundary or not.
     307      4889232 :         let is_adjacent_chunk_read = {
     308              :             // 1. first.end & second.start are in the same block
     309      4889232 :             self.end_blk_no == start_blk_no + 1 ||
     310              :             // 2. first.end ends one block before second.start
     311      2457532 :             self.end_blk_no == start_blk_no
     312              :         };
     313              : 
     314      4889232 :         if is_adjacent_chunk_read && not_limited_by_max_read_size {
     315      4827352 :             self.end_blk_no = end_blk_no;
     316      4827352 :             self.blobs_at
     317      4827352 :                 .append(start, meta)
     318      4827352 :                 .expect("LSNs are ordered within vectored reads");
     319      4827352 : 
     320      4827352 :             return VectoredReadExtended::Yes;
     321        61880 :         }
     322        61880 : 
     323        61880 :         VectoredReadExtended::No
     324      4889232 :     }
     325              : 
     326      4253350 :     pub(crate) fn size(&self) -> usize {
     327      4253350 :         (self.end_blk_no - self.start_blk_no) * self.chunk_size
     328      4253350 :     }
     329              : 
     330       411598 :     pub(crate) fn build(self) -> VectoredRead {
     331       411598 :         let start = (self.start_blk_no * self.chunk_size) as u64;
     332       411598 :         let end = (self.end_blk_no * self.chunk_size) as u64;
     333       411598 :         VectoredRead {
     334       411598 :             start,
     335       411598 :             end,
     336       411598 :             blobs_at: self.blobs_at,
     337       411598 :         }
     338       411598 :     }
     339              : }
     340              : 
     341              : #[derive(Copy, Clone, Debug)]
     342              : pub enum BlobFlag {
     343              :     None,
     344              :     Ignore,
     345              :     ReplaceAll,
     346              : }
     347              : 
     348              : /// Planner for vectored blob reads.
     349              : ///
     350              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     351              : /// and coalesced into disk reads.
     352              : ///
     353              : /// The implementation is very simple:
     354              : /// * Collect all blob offsets in an ordered structure
     355              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     356              : pub struct VectoredReadPlanner {
     357              :     // Track all the blob offsets. Start offsets must be ordered.
     358              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
     359              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     360              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     361              : 
     362              :     max_read_size: usize,
     363              : 
     364              :     mode: VectoredReadCoalesceMode,
     365              : }
     366              : 
     367              : impl VectoredReadPlanner {
     368       640302 :     pub fn new(max_read_size: usize) -> Self {
     369       640302 :         let mode = VectoredReadCoalesceMode::get();
     370       640302 :         Self {
     371       640302 :             blobs: BTreeMap::new(),
     372       640302 :             prev: None,
     373       640302 :             max_read_size,
     374       640302 :             mode,
     375       640302 :         }
     376       640302 :     }
     377              : 
     378              :     /// Include a new blob in the read plan.
     379              :     ///
     380              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     381              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     382              :     /// keys in a given keyspace. This function must be called for each key in the desired
     383              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     384              :     /// be called after every range in the offset.
     385              :     ///
     386              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     387              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     388              :     /// incorrect data to the user.
     389              :     ///
     390              :     /// The `flag` argument has two interesting values:
     391              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     392              :     ///   This is used for WAL records that `will_init`.
     393              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     394              :     ///   if the blob is cached.
     395      4238582 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     396              :         // Implementation note: internally lag behind by one blob such that
     397              :         // we have a start and end offset when initialising [`VectoredRead`]
     398      4238582 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     399              :             None => {
     400       411859 :                 self.prev = Some((key, lsn, offset, flag));
     401       411859 :                 return;
     402              :             }
     403      3826723 :             Some(prev) => prev,
     404      3826723 :         };
     405      3826723 : 
     406      3826723 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     407      3826723 : 
     408      3826723 :         self.prev = Some((key, lsn, offset, flag));
     409      4238582 :     }
     410              : 
     411       832262 :     pub fn handle_range_end(&mut self, offset: u64) {
     412       832262 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     413       411859 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     414       420403 :         }
     415              : 
     416       832262 :         self.prev = None;
     417       832262 :     }
     418              : 
     419      4238582 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     420      4238582 :         match flag {
     421      1014722 :             BlobFlag::None => {
     422      1014722 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     423      1014722 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     424      1014722 :             }
     425       771234 :             BlobFlag::ReplaceAll => {
     426       771234 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     427       771234 :                 blobs_for_key.clear();
     428       771234 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     429       771234 :             }
     430      2452626 :             BlobFlag::Ignore => {}
     431              :         }
     432      4238582 :     }
     433              : 
     434       640302 :     pub fn finish(self) -> Vec<VectoredRead> {
     435       640302 :         let mut current_read_builder: Option<VectoredReadBuilder> = None;
     436       640302 :         let mut reads = Vec::new();
     437              : 
     438      1951107 :         for (key, blobs_for_key) in self.blobs {
     439      2775034 :             for (lsn, start_offset, end_offset) in blobs_for_key {
     440      1464229 :                 let extended = match &mut current_read_builder {
     441      1073080 :                     Some(read_builder) => {
     442      1073080 :                         read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
     443              :                     }
     444       391149 :                     None => VectoredReadExtended::No,
     445              :                 };
     446              : 
     447      1464229 :                 if extended == VectoredReadExtended::No {
     448       491331 :                     let next_read_builder = VectoredReadBuilder::new(
     449       491331 :                         start_offset,
     450       491331 :                         end_offset,
     451       491331 :                         BlobMeta { key, lsn },
     452       491331 :                         self.max_read_size,
     453       491331 :                         self.mode,
     454       491331 :                     );
     455       491331 : 
     456       491331 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     457              : 
     458              :                     // `current_read_builder` is None in the first iteration of the outer loop
     459       491331 :                     if let Some(read_builder) = prev_read_builder {
     460       100182 :                         reads.push(read_builder.build());
     461       391149 :                     }
     462       972898 :                 }
     463              :             }
     464              :         }
     465              : 
     466       640302 :         if let Some(read_builder) = current_read_builder {
     467       391149 :             reads.push(read_builder.build());
     468       391149 :         }
     469              : 
     470       640302 :         reads
     471       640302 :     }
     472              : }
     473              : 
     474              : /// Disk reader for vectored blob spans (does not go through the page cache)
     475              : pub struct VectoredBlobReader<'a> {
     476              :     file: &'a VirtualFile,
     477              : }
     478              : 
     479              : impl<'a> VectoredBlobReader<'a> {
     480       761206 :     pub fn new(file: &'a VirtualFile) -> Self {
     481       761206 :         Self { file }
     482       761206 :     }
     483              : 
     484              :     /// Read the requested blobs into the buffer.
     485              :     ///
     486              :     /// We have to deal with the fact that blobs are not fixed size.
     487              :     /// Each blob is prefixed by a size header.
     488              :     ///
     489              :     /// The success return value is a struct which contains the buffer
     490              :     /// filled from disk and a list of offsets at which each blob lies
     491              :     /// in the buffer.
     492       624473 :     pub async fn read_blobs(
     493       624473 :         &self,
     494       624473 :         read: &VectoredRead,
     495       624473 :         buf: BytesMut,
     496       624473 :         ctx: &RequestContext,
     497       624473 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     498       624473 :         assert!(read.size() > 0);
     499       624473 :         assert!(
     500       624473 :             read.size() <= buf.capacity(),
     501            0 :             "{} > {}",
     502            0 :             read.size(),
     503            0 :             buf.capacity()
     504              :         );
     505              : 
     506       624473 :         if cfg!(debug_assertions) {
     507       624473 :             let align = virtual_file::get_io_buffer_alignment() as u64;
     508       624473 :             debug_assert_eq!(
     509       624473 :                 read.start % align,
     510              :                 0,
     511            0 :                 "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
     512              :                 read.start,
     513              :                 align
     514              :             );
     515            0 :         }
     516              : 
     517       624473 :         let mut buf = self
     518       624473 :             .file
     519       624473 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     520       317895 :             .await?
     521       624473 :             .into_inner();
     522       624473 : 
     523       624473 :         let blobs_at = read.blobs_at.as_slice();
     524       624473 : 
     525       624473 :         let start_offset = read.start;
     526       624473 : 
     527       624473 :         let mut metas = Vec::with_capacity(blobs_at.len());
     528       624473 :         // Blobs in `read` only provide their starting offset. The end offset
     529       624473 :         // of a blob is implicit: the start of the next blob if one exists
     530       624473 :         // or the end of the read.
     531       624473 : 
     532       624473 :         // Some scratch space, put here for reusing the allocation
     533       624473 :         let mut decompressed_vec = Vec::new();
     534              : 
     535      8482588 :         for (blob_start, meta) in blobs_at {
     536      7858115 :             let blob_start_in_buf = blob_start - start_offset;
     537      7858115 :             let first_len_byte = buf[blob_start_in_buf as usize];
     538              : 
     539              :             // Each blob is prefixed by a header containing its size and compression information.
     540              :             // Extract the size and skip that header to find the start of the data.
     541              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     542              :             // 1 byte case and 1 in the 4 byte case.
     543      7858115 :             let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
     544      7745039 :                 (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
     545              :             } else {
     546       113076 :                 let mut blob_size_buf = [0u8; 4];
     547       113076 :                 let offset_in_buf = blob_start_in_buf as usize;
     548       113076 : 
     549       113076 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     550       113076 :                 blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
     551       113076 : 
     552       113076 :                 let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     553       113076 :                 (
     554       113076 :                     4,
     555       113076 :                     u32::from_be_bytes(blob_size_buf) as u64,
     556       113076 :                     compression_bits,
     557       113076 :                 )
     558              :             };
     559              : 
     560      7858115 :             let start_raw = blob_start_in_buf + size_length;
     561      7858115 :             let end_raw = start_raw + blob_size;
     562      7858115 :             let (start, end);
     563      7858115 :             if compression_bits == BYTE_UNCOMPRESSED {
     564      7858109 :                 start = start_raw as usize;
     565      7858109 :                 end = end_raw as usize;
     566      7858109 :             } else if compression_bits == BYTE_ZSTD {
     567            6 :                 let mut decoder =
     568            6 :                     async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
     569            6 :                 decoder
     570            6 :                     .write_all(&buf[start_raw as usize..end_raw as usize])
     571            0 :                     .await?;
     572            6 :                 decoder.flush().await?;
     573            6 :                 start = buf.len();
     574            6 :                 buf.extend_from_slice(&decompressed_vec);
     575            6 :                 end = buf.len();
     576            6 :                 decompressed_vec.clear();
     577              :             } else {
     578            0 :                 let error = std::io::Error::new(
     579            0 :                     std::io::ErrorKind::InvalidData,
     580            0 :                     format!("invalid compression byte {compression_bits:x}"),
     581            0 :                 );
     582            0 :                 return Err(error);
     583              :             }
     584              : 
     585      7858115 :             metas.push(VectoredBlob {
     586      7858115 :                 start,
     587      7858115 :                 end,
     588      7858115 :                 meta: *meta,
     589      7858115 :             });
     590              :         }
     591              : 
     592       624473 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     593       624473 :     }
     594              : }
     595              : 
     596              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     597              : ///
     598              : /// It provides a streaming API for getting read blobs. It returns a batch when
     599              : /// `handle` gets called and when the current key would just exceed the read_size and
     600              : /// max_cnt constraints.
     601              : pub struct StreamingVectoredReadPlanner {
     602              :     read_builder: Option<VectoredReadBuilder>,
     603              :     // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
     604              :     prev: Option<(Key, Lsn, u64)>,
     605              :     /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
     606              :     /// we will produce a single batch instead of split them.
     607              :     max_read_size: u64,
     608              :     /// Max item count per batch
     609              :     max_cnt: usize,
     610              :     /// Size of the current batch
     611              :     cnt: usize,
     612              : 
     613              :     mode: VectoredReadCoalesceMode,
     614              : }
     615              : 
     616              : impl StreamingVectoredReadPlanner {
     617         2238 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     618         2238 :         assert!(max_cnt > 0);
     619         2238 :         assert!(max_read_size > 0);
     620         2238 :         let mode = VectoredReadCoalesceMode::get();
     621         2238 :         Self {
     622         2238 :             read_builder: None,
     623         2238 :             prev: None,
     624         2238 :             max_cnt,
     625         2238 :             max_read_size,
     626         2238 :             cnt: 0,
     627         2238 :             mode,
     628         2238 :         }
     629         2238 :     }
     630              : 
     631      6381922 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
     632              :         // Implementation note: internally lag behind by one blob such that
     633              :         // we have a start and end offset when initialising [`VectoredRead`]
     634      6381922 :         let (prev_key, prev_lsn, prev_offset) = match self.prev {
     635              :             None => {
     636         1896 :                 self.prev = Some((key, lsn, offset));
     637         1896 :                 return None;
     638              :             }
     639      6380026 :             Some(prev) => prev,
     640      6380026 :         };
     641      6380026 : 
     642      6380026 :         let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
     643      6380026 : 
     644      6380026 :         self.prev = Some((key, lsn, offset));
     645      6380026 : 
     646      6380026 :         res
     647      6381922 :     }
     648              : 
     649         1734 :     pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
     650         1734 :         let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
     651         1728 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
     652              :         } else {
     653            6 :             None
     654              :         };
     655              : 
     656         1734 :         self.prev = None;
     657         1734 : 
     658         1734 :         res
     659         1734 :     }
     660              : 
     661      6381754 :     fn add_blob(
     662      6381754 :         &mut self,
     663      6381754 :         key: Key,
     664      6381754 :         lsn: Lsn,
     665      6381754 :         start_offset: u64,
     666      6381754 :         end_offset: u64,
     667      6381754 :         is_last_blob_in_read: bool,
     668      6381754 :     ) -> Option<VectoredRead> {
     669      6381754 :         match &mut self.read_builder {
     670      6260824 :             Some(read_builder) => {
     671      6260824 :                 let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
     672      6260824 :                 assert_eq!(extended, VectoredReadExtended::Yes);
     673              :             }
     674       120930 :             None => {
     675       120930 :                 self.read_builder = {
     676       120930 :                     Some(VectoredReadBuilder::new_streaming(
     677       120930 :                         start_offset,
     678       120930 :                         end_offset,
     679       120930 :                         BlobMeta { key, lsn },
     680       120930 :                         self.mode,
     681       120930 :                     ))
     682       120930 :                 };
     683       120930 :             }
     684              :         }
     685      6381754 :         let read_builder = self.read_builder.as_mut().unwrap();
     686      6381754 :         self.cnt += 1;
     687      6381754 :         if is_last_blob_in_read
     688      6380026 :             || read_builder.size() >= self.max_read_size as usize
     689      6295054 :             || self.cnt >= self.max_cnt
     690              :         {
     691       120930 :             let prev_read_builder = self.read_builder.take();
     692       120930 :             self.cnt = 0;
     693              : 
     694              :             // `current_read_builder` is None in the first iteration
     695       120930 :             if let Some(read_builder) = prev_read_builder {
     696       120930 :                 return Some(read_builder.build());
     697            0 :             }
     698      6260824 :         }
     699      6260824 :         None
     700      6381754 :     }
     701              : }
     702              : 
     703              : #[cfg(test)]
     704              : mod tests {
     705              :     use anyhow::Error;
     706              : 
     707              :     use crate::context::DownloadBehavior;
     708              :     use crate::page_cache::PAGE_SZ;
     709              :     use crate::task_mgr::TaskKind;
     710              : 
     711              :     use super::super::blob_io::tests::{random_array, write_maybe_compressed};
     712              :     use super::*;
     713              : 
     714          132 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     715          132 :         let align = virtual_file::get_io_buffer_alignment() as u64;
     716          132 :         assert_eq!(read.start % align, 0);
     717          132 :         assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
     718              : 
     719          216 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     720          132 : 
     721          132 :         let offsets_in_read: Vec<_> = read
     722          132 :             .blobs_at
     723          132 :             .as_slice()
     724          132 :             .iter()
     725          216 :             .map(|(offset, _)| *offset)
     726          132 :             .collect();
     727          132 : 
     728          132 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     729          132 :     }
     730              : 
     731              :     #[test]
     732            6 :     fn planner_chunked_coalesce_all_test() {
     733              :         use crate::virtual_file;
     734              : 
     735            6 :         let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
     736            6 : 
     737            6 :         // The test explicitly does not check chunk size < 512
     738            6 :         if chunk_size < 512 {
     739            4 :             return;
     740            2 :         }
     741            2 : 
     742            2 :         let max_read_size = chunk_size as usize * 8;
     743            2 :         let key = Key::MIN;
     744            2 :         let lsn = Lsn(0);
     745            2 : 
     746            2 :         let blob_descriptions = [
     747            2 :             (key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
     748            2 :             (key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
     749            2 :             (key, lsn, chunk_size / 2, BlobFlag::None),
     750            2 :             (key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
     751            2 :             (key, lsn, chunk_size, BlobFlag::None),
     752            2 :             (key, lsn, chunk_size * 2 - 1, BlobFlag::None),
     753            2 :             (key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
     754            2 :             (key, lsn, chunk_size * 3 + 1, BlobFlag::None),
     755            2 :             (key, lsn, chunk_size * 5 + 1, BlobFlag::None),
     756            2 :             (key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
     757            2 :             (key, lsn, chunk_size * 7 + 1, BlobFlag::None),
     758            2 :             (key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
     759            2 :             (key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
     760            2 :             (key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
     761            2 :         ];
     762            2 : 
     763            2 :         let ranges = [
     764            2 :             &[
     765            2 :                 blob_descriptions[0],
     766            2 :                 blob_descriptions[2],
     767            2 :                 blob_descriptions[4],
     768            2 :                 blob_descriptions[5],
     769            2 :                 blob_descriptions[7],
     770            2 :                 blob_descriptions[8],
     771            2 :                 blob_descriptions[10],
     772            2 :             ],
     773            2 :             &blob_descriptions[11..12],
     774            2 :             &blob_descriptions[13..],
     775            2 :         ];
     776            2 : 
     777            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     778           30 :         for (key, lsn, offset, flag) in blob_descriptions {
     779           28 :             planner.handle(key, lsn, offset, flag);
     780           28 :         }
     781              : 
     782            2 :         planner.handle_range_end(652 * 1024);
     783            2 : 
     784            2 :         let reads = planner.finish();
     785            2 : 
     786            2 :         assert_eq!(reads.len(), ranges.len());
     787              : 
     788            6 :         for (idx, read) in reads.iter().enumerate() {
     789            6 :             validate_read(read, ranges[idx]);
     790            6 :         }
     791            6 :     }
     792              : 
     793              :     #[test]
     794            6 :     fn planner_max_read_size_test() {
     795            6 :         let max_read_size = 128 * 1024;
     796            6 :         let key = Key::MIN;
     797            6 :         let lsn = Lsn(0);
     798            6 : 
     799            6 :         let blob_descriptions = vec![
     800            6 :             (key, lsn, 0, BlobFlag::None),
     801            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     802            6 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     803            6 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     804            6 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     805            6 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     806            6 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     807            6 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     808            6 :         ];
     809            6 : 
     810            6 :         let ranges = [
     811            6 :             &blob_descriptions[0..3],
     812            6 :             &blob_descriptions[3..4],
     813            6 :             &blob_descriptions[4..5],
     814            6 :             &blob_descriptions[5..6],
     815            6 :             &blob_descriptions[6..7],
     816            6 :             &blob_descriptions[7..],
     817            6 :         ];
     818            6 : 
     819            6 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     820           48 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     821           48 :             planner.handle(key, lsn, offset, flag);
     822           48 :         }
     823              : 
     824            6 :         planner.handle_range_end(652 * 1024);
     825            6 : 
     826            6 :         let reads = planner.finish();
     827            6 : 
     828            6 :         assert_eq!(reads.len(), 6);
     829              : 
     830              :         // TODO: could remove zero reads to produce 5 reads here
     831              : 
     832           36 :         for (idx, read) in reads.iter().enumerate() {
     833           36 :             validate_read(read, ranges[idx]);
     834           36 :         }
     835            6 :     }
     836              : 
     837              :     #[test]
     838            6 :     fn planner_replacement_test() {
     839            6 :         let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
     840            6 :         let max_read_size = 128 * chunk_size as usize;
     841            6 :         let first_key = Key::MIN;
     842            6 :         let second_key = first_key.next();
     843            6 :         let lsn = Lsn(0);
     844            6 : 
     845            6 :         let blob_descriptions = vec![
     846            6 :             (first_key, lsn, 0, BlobFlag::None),          // First in read 1
     847            6 :             (first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
     848            6 :             (second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
     849            6 :             (second_key, lsn, 3 * chunk_size, BlobFlag::None),
     850            6 :             (second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
     851            6 :             (second_key, lsn, 5 * chunk_size, BlobFlag::None),       // Last in read 2
     852            6 :         ];
     853            6 : 
     854            6 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     855            6 : 
     856            6 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     857           36 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     858           36 :             planner.handle(key, lsn, offset, flag);
     859           36 :         }
     860              : 
     861            6 :         planner.handle_range_end(6 * chunk_size);
     862            6 : 
     863            6 :         let reads = planner.finish();
     864            6 :         assert_eq!(reads.len(), 2);
     865              : 
     866           12 :         for (idx, read) in reads.iter().enumerate() {
     867           12 :             validate_read(read, ranges[idx]);
     868           12 :         }
     869            6 :     }
     870              : 
     871              :     #[test]
     872            6 :     fn streaming_planner_max_read_size_test() {
     873            6 :         let max_read_size = 128 * 1024;
     874            6 :         let key = Key::MIN;
     875            6 :         let lsn = Lsn(0);
     876            6 : 
     877            6 :         let blob_descriptions = vec![
     878            6 :             (key, lsn, 0, BlobFlag::None),
     879            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     880            6 :             (key, lsn, 96 * 1024, BlobFlag::None),
     881            6 :             (key, lsn, 128 * 1024, BlobFlag::None),
     882            6 :             (key, lsn, 198 * 1024, BlobFlag::None),
     883            6 :             (key, lsn, 268 * 1024, BlobFlag::None),
     884            6 :             (key, lsn, 396 * 1024, BlobFlag::None),
     885            6 :             (key, lsn, 652 * 1024, BlobFlag::None),
     886            6 :         ];
     887            6 : 
     888            6 :         let ranges = [
     889            6 :             &blob_descriptions[0..3],
     890            6 :             &blob_descriptions[3..5],
     891            6 :             &blob_descriptions[5..6],
     892            6 :             &blob_descriptions[6..7],
     893            6 :             &blob_descriptions[7..],
     894            6 :         ];
     895            6 : 
     896            6 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
     897            6 :         let mut reads = Vec::new();
     898           48 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     899           48 :             reads.extend(planner.handle(key, lsn, offset));
     900           48 :         }
     901            6 :         reads.extend(planner.handle_range_end(652 * 1024));
     902            6 : 
     903            6 :         assert_eq!(reads.len(), ranges.len());
     904              : 
     905           30 :         for (idx, read) in reads.iter().enumerate() {
     906           30 :             validate_read(read, ranges[idx]);
     907           30 :         }
     908            6 :     }
     909              : 
     910              :     #[test]
     911            6 :     fn streaming_planner_max_cnt_test() {
     912            6 :         let max_read_size = 1024 * 1024;
     913            6 :         let key = Key::MIN;
     914            6 :         let lsn = Lsn(0);
     915            6 : 
     916            6 :         let blob_descriptions = vec![
     917            6 :             (key, lsn, 0, BlobFlag::None),
     918            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     919            6 :             (key, lsn, 96 * 1024, BlobFlag::None),
     920            6 :             (key, lsn, 128 * 1024, BlobFlag::None),
     921            6 :             (key, lsn, 198 * 1024, BlobFlag::None),
     922            6 :             (key, lsn, 268 * 1024, BlobFlag::None),
     923            6 :             (key, lsn, 396 * 1024, BlobFlag::None),
     924            6 :             (key, lsn, 652 * 1024, BlobFlag::None),
     925            6 :         ];
     926            6 : 
     927            6 :         let ranges = [
     928            6 :             &blob_descriptions[0..2],
     929            6 :             &blob_descriptions[2..4],
     930            6 :             &blob_descriptions[4..6],
     931            6 :             &blob_descriptions[6..],
     932            6 :         ];
     933            6 : 
     934            6 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     935            6 :         let mut reads = Vec::new();
     936           48 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     937           48 :             reads.extend(planner.handle(key, lsn, offset));
     938           48 :         }
     939            6 :         reads.extend(planner.handle_range_end(652 * 1024));
     940            6 : 
     941            6 :         assert_eq!(reads.len(), ranges.len());
     942              : 
     943           24 :         for (idx, read) in reads.iter().enumerate() {
     944           24 :             validate_read(read, ranges[idx]);
     945           24 :         }
     946            6 :     }
     947              : 
     948              :     #[test]
     949            6 :     fn streaming_planner_edge_test() {
     950            6 :         let max_read_size = 1024 * 1024;
     951            6 :         let key = Key::MIN;
     952            6 :         let lsn = Lsn(0);
     953            6 :         {
     954            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     955            6 :             let mut reads = Vec::new();
     956            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     957            6 :             assert!(reads.is_empty());
     958              :         }
     959              :         {
     960            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     961            6 :             let mut reads = Vec::new();
     962            6 :             reads.extend(planner.handle(key, lsn, 0));
     963            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     964            6 :             assert_eq!(reads.len(), 1);
     965            6 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     966            6 :         }
     967            6 :         {
     968            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     969            6 :             let mut reads = Vec::new();
     970            6 :             reads.extend(planner.handle(key, lsn, 0));
     971            6 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     972            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     973            6 :             assert_eq!(reads.len(), 2);
     974            6 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     975            6 :             validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
     976            6 :         }
     977            6 :         {
     978            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     979            6 :             let mut reads = Vec::new();
     980            6 :             reads.extend(planner.handle(key, lsn, 0));
     981            6 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     982            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     983            6 :             assert_eq!(reads.len(), 1);
     984            6 :             validate_read(
     985            6 :                 &reads[0],
     986            6 :                 &[
     987            6 :                     (key, lsn, 0, BlobFlag::None),
     988            6 :                     (key, lsn, 128 * 1024, BlobFlag::None),
     989            6 :                 ],
     990            6 :             );
     991            6 :         }
     992            6 :     }
     993              : 
     994           24 :     async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
     995           24 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     996           24 :         let (_temp_dir, pathbuf, offsets) =
     997          837 :             write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
     998              : 
     999           24 :         let file = VirtualFile::open(&pathbuf, &ctx).await?;
    1000           24 :         let file_len = std::fs::metadata(&pathbuf)?.len();
    1001           24 : 
    1002           24 :         // Multiply by two (compressed data might need more space), and add a few bytes for the header
    1003        12360 :         let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
    1004           24 :         let mut buf = BytesMut::with_capacity(reserved_bytes);
    1005           24 : 
    1006           24 :         let mode = VectoredReadCoalesceMode::get();
    1007           24 :         let vectored_blob_reader = VectoredBlobReader::new(&file);
    1008           24 :         let meta = BlobMeta {
    1009           24 :             key: Key::MIN,
    1010           24 :             lsn: Lsn(0),
    1011           24 :         };
    1012              : 
    1013        12360 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
    1014        12360 :             let end = offsets.get(idx + 1).unwrap_or(&file_len);
    1015        12360 :             if idx + 1 == offsets.len() {
    1016           24 :                 continue;
    1017        12336 :             }
    1018        12336 :             let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode);
    1019        12336 :             let read = read_builder.build();
    1020        12336 :             let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
    1021        12336 :             assert_eq!(result.blobs.len(), 1);
    1022        12336 :             let read_blob = &result.blobs[0];
    1023        12336 :             let read_buf = &result.buf[read_blob.start..read_blob.end];
    1024        12336 :             assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
    1025        12336 :             buf = result.buf;
    1026              :         }
    1027           24 :         Ok(())
    1028           24 :     }
    1029              : 
    1030              :     #[tokio::test]
    1031            6 :     async fn test_really_big_array() -> Result<(), Error> {
    1032            6 :         let blobs = &[
    1033            6 :             b"test".to_vec(),
    1034            6 :             random_array(10 * PAGE_SZ),
    1035            6 :             b"hello".to_vec(),
    1036            6 :             random_array(66 * PAGE_SZ),
    1037            6 :             vec![0xf3; 24 * PAGE_SZ],
    1038            6 :             b"foobar".to_vec(),
    1039            6 :         ];
    1040           42 :         round_trip_test_compressed(blobs, false).await?;
    1041           33 :         round_trip_test_compressed(blobs, true).await?;
    1042            6 :         Ok(())
    1043            6 :     }
    1044              : 
    1045              :     #[tokio::test]
    1046            6 :     async fn test_arrays_inc() -> Result<(), Error> {
    1047            6 :         let blobs = (0..PAGE_SZ / 8)
    1048         6144 :             .map(|v| random_array(v * 16))
    1049            6 :             .collect::<Vec<_>>();
    1050         3516 :         round_trip_test_compressed(&blobs, false).await?;
    1051         3516 :         round_trip_test_compressed(&blobs, true).await?;
    1052            6 :         Ok(())
    1053            6 :     }
    1054              : 
    1055              :     #[test]
    1056            6 :     fn test_div_round_up() {
    1057              :         const CHUNK_SIZE: usize = 512;
    1058            6 :         assert_eq!(1, div_round_up(200, CHUNK_SIZE));
    1059            6 :         assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE));
    1060            6 :         assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE));
    1061            6 :     }
    1062              : }
        

Generated by: LCOV version 2.1-beta