LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 98.3 % 694 682
Test Date: 2024-08-29 11:33:10 Functions: 100.0 % 47 47

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::num::NonZeroUsize;
      20              : 
      21              : use bytes::BytesMut;
      22              : use pageserver_api::key::Key;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::BoundedBuf;
      25              : use utils::lsn::Lsn;
      26              : use utils::vec_map::VecMap;
      27              : 
      28              : use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
      29              : use crate::context::RequestContext;
      30              : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
      31              : use crate::virtual_file::{self, VirtualFile};
      32              : 
      33              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      34              : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
      35              : 
      36              : /// Metadata bundled with the start and end offset of a blob.
      37              : #[derive(Copy, Clone, Debug)]
      38              : pub struct BlobMeta {
      39              :     pub key: Key,
      40              :     pub lsn: Lsn,
      41              : }
      42              : 
      43              : /// Blob offsets into [`VectoredBlobsBuf::buf`]
      44              : pub struct VectoredBlob {
      45              :     pub start: usize,
      46              :     pub end: usize,
      47              :     pub meta: BlobMeta,
      48              : }
      49              : 
      50              : /// Return type of [`VectoredBlobReader::read_blobs`]
      51              : pub struct VectoredBlobsBuf {
      52              :     /// Buffer for all blobs in this read
      53              :     pub buf: BytesMut,
      54              :     /// Offsets into the buffer and metadata for all blobs in this read
      55              :     pub blobs: Vec<VectoredBlob>,
      56              : }
      57              : 
      58              : /// Description of one disk read for multiple blobs.
      59              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
      60              : #[derive(Debug)]
      61              : pub struct VectoredRead {
      62              :     pub start: u64,
      63              :     pub end: u64,
      64              :     /// Start offset and metadata for each blob in this read
      65              :     pub blobs_at: VecMap<u64, BlobMeta>,
      66              : }
      67              : 
      68              : impl VectoredRead {
      69      2848262 :     pub(crate) fn size(&self) -> usize {
      70      2848262 :         (self.end - self.start) as usize
      71      2848262 :     }
      72              : }
      73              : 
      74              : #[derive(Eq, PartialEq, Debug)]
      75              : pub(crate) enum VectoredReadExtended {
      76              :     Yes,
      77              :     No,
      78              : }
      79              : 
      80              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      81              : pub enum VectoredReadCoalesceMode {
      82              :     /// Only coalesce exactly adjacent reads.
      83              :     AdjacentOnly,
      84              :     /// In addition to adjacent reads, also consider reads whose corresponding
      85              :     /// `end` and `start` offsets reside at the same chunk.
      86              :     Chunked(usize),
      87              : }
      88              : 
      89              : impl VectoredReadCoalesceMode {
      90              :     /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0,
      91              :     /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher.
      92       641199 :     pub(crate) fn get() -> Self {
      93       641199 :         let align = virtual_file::get_io_buffer_alignment_raw();
      94       641199 :         if align == DEFAULT_IO_BUFFER_ALIGNMENT {
      95       213187 :             VectoredReadCoalesceMode::AdjacentOnly
      96              :         } else {
      97       428012 :             VectoredReadCoalesceMode::Chunked(align)
      98              :         }
      99       641199 :     }
     100              : }
     101              : 
     102              : pub(crate) enum VectoredReadBuilder {
     103              :     Adjacent(AdjacentVectoredReadBuilder),
     104              :     Chunked(ChunkedVectoredReadBuilder),
     105              : }
     106              : 
     107              : impl VectoredReadBuilder {
     108       623674 :     fn new_impl(
     109       623674 :         start_offset: u64,
     110       623674 :         end_offset: u64,
     111       623674 :         meta: BlobMeta,
     112       623674 :         max_read_size: Option<usize>,
     113       623674 :         mode: VectoredReadCoalesceMode,
     114       623674 :     ) -> Self {
     115       623674 :         match mode {
     116       212646 :             VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent(
     117       212646 :                 AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size),
     118       212646 :             ),
     119       411028 :             VectoredReadCoalesceMode::Chunked(chunk_size) => {
     120       411028 :                 Self::Chunked(ChunkedVectoredReadBuilder::new(
     121       411028 :                     start_offset,
     122       411028 :                     end_offset,
     123       411028 :                     meta,
     124       411028 :                     max_read_size,
     125       411028 :                     chunk_size,
     126       411028 :                 ))
     127              :             }
     128              :         }
     129       623674 :     }
     130              : 
     131       503002 :     pub(crate) fn new(
     132       503002 :         start_offset: u64,
     133       503002 :         end_offset: u64,
     134       503002 :         meta: BlobMeta,
     135       503002 :         max_read_size: usize,
     136       503002 :         mode: VectoredReadCoalesceMode,
     137       503002 :     ) -> Self {
     138       503002 :         Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode)
     139       503002 :     }
     140              : 
     141       120672 :     pub(crate) fn new_streaming(
     142       120672 :         start_offset: u64,
     143       120672 :         end_offset: u64,
     144       120672 :         meta: BlobMeta,
     145       120672 :         mode: VectoredReadCoalesceMode,
     146       120672 :     ) -> Self {
     147       120672 :         Self::new_impl(start_offset, end_offset, meta, None, mode)
     148       120672 :     }
     149              : 
     150      7334262 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     151      7334262 :         match self {
     152      2444738 :             VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta),
     153      4889524 :             VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta),
     154              :         }
     155      7334262 :     }
     156              : 
     157       623674 :     pub(crate) fn build(self) -> VectoredRead {
     158       623674 :         match self {
     159       212646 :             VectoredReadBuilder::Adjacent(builder) => builder.build(),
     160       411028 :             VectoredReadBuilder::Chunked(builder) => builder.build(),
     161              :         }
     162       623674 :     }
     163              : 
     164      6380028 :     pub(crate) fn size(&self) -> usize {
     165      6380028 :         match self {
     166      2126676 :             VectoredReadBuilder::Adjacent(builder) => builder.size(),
     167      4253352 :             VectoredReadBuilder::Chunked(builder) => builder.size(),
     168              :         }
     169      6380028 :     }
     170              : }
     171              : 
     172              : pub(crate) struct AdjacentVectoredReadBuilder {
     173              :     /// Start offset of the read.
     174              :     start: u64,
     175              :     // End offset of the read.
     176              :     end: u64,
     177              :     /// Start offset and metadata for each blob in this read
     178              :     blobs_at: VecMap<u64, BlobMeta>,
     179              :     max_read_size: Option<usize>,
     180              : }
     181              : 
     182              : impl AdjacentVectoredReadBuilder {
     183              :     /// Start building a new vectored read.
     184              :     ///
     185              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     186              :     /// support reading larger blobs than the configuration value. The builder will be single use
     187              :     /// however after that.
     188       212646 :     pub(crate) fn new(
     189       212646 :         start_offset: u64,
     190       212646 :         end_offset: u64,
     191       212646 :         meta: BlobMeta,
     192       212646 :         max_read_size: Option<usize>,
     193       212646 :     ) -> Self {
     194       212646 :         let mut blobs_at = VecMap::default();
     195       212646 :         blobs_at
     196       212646 :             .append(start_offset, meta)
     197       212646 :             .expect("First insertion always succeeds");
     198       212646 : 
     199       212646 :         Self {
     200       212646 :             start: start_offset,
     201       212646 :             end: end_offset,
     202       212646 :             blobs_at,
     203       212646 :             max_read_size,
     204       212646 :         }
     205       212646 :     }
     206              :     /// Attempt to extend the current read with a new blob if the start
     207              :     /// offset matches with the current end of the vectored read
     208              :     /// and the resuting size is below the max read size
     209      2444738 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     210      2444738 :         tracing::trace!(start, end, "trying to extend");
     211      2444738 :         let size = (end - start) as usize;
     212      2444738 :         let not_limited_by_max_read_size = {
     213      2444738 :             if let Some(max_read_size) = self.max_read_size {
     214       357710 :                 self.size() + size <= max_read_size
     215              :             } else {
     216      2087028 :                 true
     217              :             }
     218              :         };
     219              : 
     220      2444738 :         if self.end == start && not_limited_by_max_read_size {
     221      2406589 :             self.end = end;
     222      2406589 :             self.blobs_at
     223      2406589 :                 .append(start, meta)
     224      2406589 :                 .expect("LSNs are ordered within vectored reads");
     225      2406589 : 
     226      2406589 :             return VectoredReadExtended::Yes;
     227        38149 :         }
     228        38149 : 
     229        38149 :         VectoredReadExtended::No
     230      2444738 :     }
     231              : 
     232      2484386 :     pub(crate) fn size(&self) -> usize {
     233      2484386 :         (self.end - self.start) as usize
     234      2484386 :     }
     235              : 
     236       212646 :     pub(crate) fn build(self) -> VectoredRead {
     237       212646 :         VectoredRead {
     238       212646 :             start: self.start,
     239       212646 :             end: self.end,
     240       212646 :             blobs_at: self.blobs_at,
     241       212646 :         }
     242       212646 :     }
     243              : }
     244              : 
     245              : pub(crate) struct ChunkedVectoredReadBuilder {
     246              :     /// Start block number
     247              :     start_blk_no: usize,
     248              :     /// End block number (exclusive).
     249              :     end_blk_no: usize,
     250              :     /// Start offset and metadata for each blob in this read
     251              :     blobs_at: VecMap<u64, BlobMeta>,
     252              :     max_read_size: Option<usize>,
     253              :     /// Chunk size reads are coalesced into.
     254              :     chunk_size: usize,
     255              : }
     256              : 
     257              : /// Computes x / d rounded up.
     258      5300570 : fn div_round_up(x: usize, d: usize) -> usize {
     259      5300570 :     (x + (d - 1)) / d
     260      5300570 : }
     261              : 
     262              : impl ChunkedVectoredReadBuilder {
     263              :     /// Start building a new vectored read.
     264              :     ///
     265              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     266              :     /// support reading larger blobs than the configuration value. The builder will be single use
     267              :     /// however after that.
     268       411028 :     pub(crate) fn new(
     269       411028 :         start_offset: u64,
     270       411028 :         end_offset: u64,
     271       411028 :         meta: BlobMeta,
     272       411028 :         max_read_size: Option<usize>,
     273       411028 :         chunk_size: usize,
     274       411028 :     ) -> Self {
     275       411028 :         let mut blobs_at = VecMap::default();
     276       411028 :         blobs_at
     277       411028 :             .append(start_offset, meta)
     278       411028 :             .expect("First insertion always succeeds");
     279       411028 : 
     280       411028 :         let start_blk_no = start_offset as usize / chunk_size;
     281       411028 :         let end_blk_no = div_round_up(end_offset as usize, chunk_size);
     282       411028 :         Self {
     283       411028 :             start_blk_no,
     284       411028 :             end_blk_no,
     285       411028 :             blobs_at,
     286       411028 :             max_read_size,
     287       411028 :             chunk_size,
     288       411028 :         }
     289       411028 :     }
     290              : 
     291              :     /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
     292              :     ///
     293              :     /// The resulting size also must be below the max read size.
     294      4889524 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     295      4889524 :         tracing::trace!(start, end, "trying to extend");
     296      4889524 :         let start_blk_no = start as usize / self.chunk_size;
     297      4889524 :         let end_blk_no = div_round_up(end as usize, self.chunk_size);
     298              : 
     299      4889524 :         let not_limited_by_max_read_size = {
     300      4889524 :             if let Some(max_read_size) = self.max_read_size {
     301       715468 :                 let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
     302       715468 :                 coalesced_size <= max_read_size
     303              :             } else {
     304      4174056 :                 true
     305              :             }
     306              :         };
     307              : 
     308              :         // True if the second block starts in the same block or the immediate next block where the first block ended.
     309              :         //
     310              :         // Note: This automatically handles the case where two blocks are adjacent to each other,
     311              :         // whether they starts on chunk size boundary or not.
     312      4889524 :         let is_adjacent_chunk_read = {
     313              :             // 1. first.end & second.start are in the same block
     314      4889524 :             self.end_blk_no == start_blk_no + 1 ||
     315              :             // 2. first.end ends one block before second.start
     316      2457611 :             self.end_blk_no == start_blk_no
     317              :         };
     318              : 
     319      4889524 :         if is_adjacent_chunk_read && not_limited_by_max_read_size {
     320      4827765 :             self.end_blk_no = end_blk_no;
     321      4827765 :             self.blobs_at
     322      4827765 :                 .append(start, meta)
     323      4827765 :                 .expect("LSNs are ordered within vectored reads");
     324      4827765 : 
     325      4827765 :             return VectoredReadExtended::Yes;
     326        61759 :         }
     327        61759 : 
     328        61759 :         VectoredReadExtended::No
     329      4889524 :     }
     330              : 
     331      4253352 :     pub(crate) fn size(&self) -> usize {
     332      4253352 :         (self.end_blk_no - self.start_blk_no) * self.chunk_size
     333      4253352 :     }
     334              : 
     335       411028 :     pub(crate) fn build(self) -> VectoredRead {
     336       411028 :         let start = (self.start_blk_no * self.chunk_size) as u64;
     337       411028 :         let end = (self.end_blk_no * self.chunk_size) as u64;
     338       411028 :         VectoredRead {
     339       411028 :             start,
     340       411028 :             end,
     341       411028 :             blobs_at: self.blobs_at,
     342       411028 :         }
     343       411028 :     }
     344              : }
     345              : 
     346              : #[derive(Copy, Clone, Debug)]
     347              : pub enum BlobFlag {
     348              :     None,
     349              :     Ignore,
     350              :     ReplaceAll,
     351              : }
     352              : 
     353              : /// Planner for vectored blob reads.
     354              : ///
     355              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     356              : /// and coalesced into disk reads.
     357              : ///
     358              : /// The implementation is very simple:
     359              : /// * Collect all blob offsets in an ordered structure
     360              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     361              : pub struct VectoredReadPlanner {
     362              :     // Track all the blob offsets. Start offsets must be ordered.
     363              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
     364              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     365              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     366              : 
     367              :     max_read_size: usize,
     368              : 
     369              :     mode: VectoredReadCoalesceMode,
     370              : }
     371              : 
     372              : impl VectoredReadPlanner {
     373       638907 :     pub fn new(max_read_size: usize) -> Self {
     374       638907 :         let mode = VectoredReadCoalesceMode::get();
     375       638907 :         Self {
     376       638907 :             blobs: BTreeMap::new(),
     377       638907 :             prev: None,
     378       638907 :             max_read_size,
     379       638907 :             mode,
     380       638907 :         }
     381       638907 :     }
     382              : 
     383              :     /// Include a new blob in the read plan.
     384              :     ///
     385              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     386              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     387              :     /// keys in a given keyspace. This function must be called for each key in the desired
     388              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     389              :     /// be called after every range in the offset.
     390              :     ///
     391              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     392              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     393              :     /// incorrect data to the user.
     394              :     ///
     395              :     /// The `flag` argument has two interesting values:
     396              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     397              :     ///   This is used for WAL records that `will_init`.
     398              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     399              :     ///   if the blob is cached.
     400      4238286 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     401              :         // Implementation note: internally lag behind by one blob such that
     402              :         // we have a start and end offset when initialising [`VectoredRead`]
     403      4238286 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     404              :             None => {
     405       411238 :                 self.prev = Some((key, lsn, offset, flag));
     406       411238 :                 return;
     407              :             }
     408      3827048 :             Some(prev) => prev,
     409      3827048 :         };
     410      3827048 : 
     411      3827048 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     412      3827048 : 
     413      3827048 :         self.prev = Some((key, lsn, offset, flag));
     414      4238286 :     }
     415              : 
     416       831098 :     pub fn handle_range_end(&mut self, offset: u64) {
     417       831098 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     418       411238 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     419       419860 :         }
     420              : 
     421       831098 :         self.prev = None;
     422       831098 :     }
     423              : 
     424      4238286 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     425      4238286 :         match flag {
     426      1014908 :             BlobFlag::None => {
     427      1014908 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     428      1014908 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     429      1014908 :             }
     430       770620 :             BlobFlag::ReplaceAll => {
     431       770620 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     432       770620 :                 blobs_for_key.clear();
     433       770620 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     434       770620 :             }
     435      2452758 :             BlobFlag::Ignore => {}
     436              :         }
     437      4238286 :     }
     438              : 
     439       638907 :     pub fn finish(self) -> Vec<VectoredRead> {
     440       638907 :         let mut current_read_builder: Option<VectoredReadBuilder> = None;
     441       638907 :         let mut reads = Vec::new();
     442              : 
     443      1949291 :         for (key, blobs_for_key) in self.blobs {
     444      2774224 :             for (lsn, start_offset, end_offset) in blobs_for_key {
     445      1463840 :                 let extended = match &mut current_read_builder {
     446      1073112 :                     Some(read_builder) => {
     447      1073112 :                         read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
     448              :                     }
     449       390728 :                     None => VectoredReadExtended::No,
     450              :                 };
     451              : 
     452      1463840 :                 if extended == VectoredReadExtended::No {
     453       490618 :                     let next_read_builder = VectoredReadBuilder::new(
     454       490618 :                         start_offset,
     455       490618 :                         end_offset,
     456       490618 :                         BlobMeta { key, lsn },
     457       490618 :                         self.max_read_size,
     458       490618 :                         self.mode,
     459       490618 :                     );
     460       490618 : 
     461       490618 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     462              : 
     463              :                     // `current_read_builder` is None in the first iteration of the outer loop
     464       490618 :                     if let Some(read_builder) = prev_read_builder {
     465        99890 :                         reads.push(read_builder.build());
     466       390728 :                     }
     467       973222 :                 }
     468              :             }
     469              :         }
     470              : 
     471       638907 :         if let Some(read_builder) = current_read_builder {
     472       390728 :             reads.push(read_builder.build());
     473       390728 :         }
     474              : 
     475       638907 :         reads
     476       638907 :     }
     477              : }
     478              : 
     479              : /// Disk reader for vectored blob spans (does not go through the page cache)
     480              : pub struct VectoredBlobReader<'a> {
     481              :     file: &'a VirtualFile,
     482              : }
     483              : 
     484              : impl<'a> VectoredBlobReader<'a> {
     485       759549 :     pub fn new(file: &'a VirtualFile) -> Self {
     486       759549 :         Self { file }
     487       759549 :     }
     488              : 
     489              :     /// Read the requested blobs into the buffer.
     490              :     ///
     491              :     /// We have to deal with the fact that blobs are not fixed size.
     492              :     /// Each blob is prefixed by a size header.
     493              :     ///
     494              :     /// The success return value is a struct which contains the buffer
     495              :     /// filled from disk and a list of offsets at which each blob lies
     496              :     /// in the buffer.
     497       623490 :     pub async fn read_blobs(
     498       623490 :         &self,
     499       623490 :         read: &VectoredRead,
     500       623490 :         buf: BytesMut,
     501       623490 :         ctx: &RequestContext,
     502       623490 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     503       623490 :         assert!(read.size() > 0);
     504       623490 :         assert!(
     505       623490 :             read.size() <= buf.capacity(),
     506            0 :             "{} > {}",
     507            0 :             read.size(),
     508            0 :             buf.capacity()
     509              :         );
     510              : 
     511       623490 :         if cfg!(debug_assertions) {
     512       623490 :             let align = virtual_file::get_io_buffer_alignment() as u64;
     513       623490 :             debug_assert_eq!(
     514       623490 :                 read.start % align,
     515              :                 0,
     516            0 :                 "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
     517              :                 read.start,
     518              :                 align
     519              :             );
     520            0 :         }
     521              : 
     522       623490 :         let mut buf = self
     523       623490 :             .file
     524       623490 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     525       317170 :             .await?
     526       623490 :             .into_inner();
     527       623490 : 
     528       623490 :         let blobs_at = read.blobs_at.as_slice();
     529       623490 : 
     530       623490 :         let start_offset = read.start;
     531       623490 : 
     532       623490 :         let mut metas = Vec::with_capacity(blobs_at.len());
     533       623490 :         // Blobs in `read` only provide their starting offset. The end offset
     534       623490 :         // of a blob is implicit: the start of the next blob if one exists
     535       623490 :         // or the end of the read.
     536       623490 : 
     537       623490 :         // Some scratch space, put here for reusing the allocation
     538       623490 :         let mut decompressed_vec = Vec::new();
     539              : 
     540      8481182 :         for (blob_start, meta) in blobs_at {
     541      7857692 :             let blob_start_in_buf = blob_start - start_offset;
     542      7857692 :             let first_len_byte = buf[blob_start_in_buf as usize];
     543              : 
     544              :             // Each blob is prefixed by a header containing its size and compression information.
     545              :             // Extract the size and skip that header to find the start of the data.
     546              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     547              :             // 1 byte case and 1 in the 4 byte case.
     548      7857692 :             let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
     549      7744616 :                 (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
     550              :             } else {
     551       113076 :                 let mut blob_size_buf = [0u8; 4];
     552       113076 :                 let offset_in_buf = blob_start_in_buf as usize;
     553       113076 : 
     554       113076 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     555       113076 :                 blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
     556       113076 : 
     557       113076 :                 let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     558       113076 :                 (
     559       113076 :                     4,
     560       113076 :                     u32::from_be_bytes(blob_size_buf) as u64,
     561       113076 :                     compression_bits,
     562       113076 :                 )
     563              :             };
     564              : 
     565      7857692 :             let start_raw = blob_start_in_buf + size_length;
     566      7857692 :             let end_raw = start_raw + blob_size;
     567      7857692 :             let (start, end);
     568      7857692 :             if compression_bits == BYTE_UNCOMPRESSED {
     569      7857686 :                 start = start_raw as usize;
     570      7857686 :                 end = end_raw as usize;
     571      7857686 :             } else if compression_bits == BYTE_ZSTD {
     572            6 :                 let mut decoder =
     573            6 :                     async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
     574            6 :                 decoder
     575            6 :                     .write_all(&buf[start_raw as usize..end_raw as usize])
     576            0 :                     .await?;
     577            6 :                 decoder.flush().await?;
     578            6 :                 start = buf.len();
     579            6 :                 buf.extend_from_slice(&decompressed_vec);
     580            6 :                 end = buf.len();
     581            6 :                 decompressed_vec.clear();
     582              :             } else {
     583            0 :                 let error = std::io::Error::new(
     584            0 :                     std::io::ErrorKind::InvalidData,
     585            0 :                     format!("invalid compression byte {compression_bits:x}"),
     586            0 :                 );
     587            0 :                 return Err(error);
     588              :             }
     589              : 
     590      7857692 :             metas.push(VectoredBlob {
     591      7857692 :                 start,
     592      7857692 :                 end,
     593      7857692 :                 meta: *meta,
     594      7857692 :             });
     595              :         }
     596              : 
     597       623490 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     598       623490 :     }
     599              : }
     600              : 
     601              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
     602              : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
     603              : /// max_cnt constraints.
     604              : pub struct StreamingVectoredReadPlanner {
     605              :     read_builder: Option<VectoredReadBuilder>,
     606              :     // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
     607              :     prev: Option<(Key, Lsn, u64)>,
     608              :     /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
     609              :     /// we will produce a single batch instead of split them.
     610              :     max_read_size: u64,
     611              :     /// Max item count per batch
     612              :     max_cnt: usize,
     613              :     /// Size of the current batch
     614              :     cnt: usize,
     615              : 
     616              :     mode: VectoredReadCoalesceMode,
     617              : }
     618              : 
     619              : impl StreamingVectoredReadPlanner {
     620         2238 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     621         2238 :         assert!(max_cnt > 0);
     622         2238 :         assert!(max_read_size > 0);
     623         2238 :         let mode = VectoredReadCoalesceMode::get();
     624         2238 :         Self {
     625         2238 :             read_builder: None,
     626         2238 :             prev: None,
     627         2238 :             max_cnt,
     628         2238 :             max_read_size,
     629         2238 :             cnt: 0,
     630         2238 :             mode,
     631         2238 :         }
     632         2238 :     }
     633              : 
     634      6381924 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
     635              :         // Implementation note: internally lag behind by one blob such that
     636              :         // we have a start and end offset when initialising [`VectoredRead`]
     637      6381924 :         let (prev_key, prev_lsn, prev_offset) = match self.prev {
     638              :             None => {
     639         1896 :                 self.prev = Some((key, lsn, offset));
     640         1896 :                 return None;
     641              :             }
     642      6380028 :             Some(prev) => prev,
     643      6380028 :         };
     644      6380028 : 
     645      6380028 :         let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
     646      6380028 : 
     647      6380028 :         self.prev = Some((key, lsn, offset));
     648      6380028 : 
     649      6380028 :         res
     650      6381924 :     }
     651              : 
     652         1734 :     pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
     653         1734 :         let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
     654         1728 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
     655              :         } else {
     656            6 :             None
     657              :         };
     658              : 
     659         1734 :         self.prev = None;
     660         1734 : 
     661         1734 :         res
     662         1734 :     }
     663              : 
     664      6381756 :     fn add_blob(
     665      6381756 :         &mut self,
     666      6381756 :         key: Key,
     667      6381756 :         lsn: Lsn,
     668      6381756 :         start_offset: u64,
     669      6381756 :         end_offset: u64,
     670      6381756 :         is_last_blob_in_read: bool,
     671      6381756 :     ) -> Option<VectoredRead> {
     672      6381756 :         match &mut self.read_builder {
     673      6261084 :             Some(read_builder) => {
     674      6261084 :                 let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
     675      6261084 :                 assert_eq!(extended, VectoredReadExtended::Yes);
     676              :             }
     677       120672 :             None => {
     678       120672 :                 self.read_builder = {
     679       120672 :                     Some(VectoredReadBuilder::new_streaming(
     680       120672 :                         start_offset,
     681       120672 :                         end_offset,
     682       120672 :                         BlobMeta { key, lsn },
     683       120672 :                         self.mode,
     684       120672 :                     ))
     685       120672 :                 };
     686       120672 :             }
     687              :         }
     688      6381756 :         let read_builder = self.read_builder.as_mut().unwrap();
     689      6381756 :         self.cnt += 1;
     690      6381756 :         if is_last_blob_in_read
     691      6380028 :             || read_builder.size() >= self.max_read_size as usize
     692      6295836 :             || self.cnt >= self.max_cnt
     693              :         {
     694       120672 :             let prev_read_builder = self.read_builder.take();
     695       120672 :             self.cnt = 0;
     696              : 
     697              :             // `current_read_builder` is None in the first iteration
     698       120672 :             if let Some(read_builder) = prev_read_builder {
     699       120672 :                 return Some(read_builder.build());
     700            0 :             }
     701      6261084 :         }
     702      6261084 :         None
     703      6381756 :     }
     704              : }
     705              : 
     706              : #[cfg(test)]
     707              : mod tests {
     708              :     use anyhow::Error;
     709              : 
     710              :     use crate::context::DownloadBehavior;
     711              :     use crate::page_cache::PAGE_SZ;
     712              :     use crate::task_mgr::TaskKind;
     713              : 
     714              :     use super::super::blob_io::tests::{random_array, write_maybe_compressed};
     715              :     use super::*;
     716              : 
     717          144 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     718          144 :         let align = virtual_file::get_io_buffer_alignment() as u64;
     719          144 :         assert_eq!(read.start % align, 0);
     720          144 :         assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
     721              : 
     722          252 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     723          144 : 
     724          144 :         let offsets_in_read: Vec<_> = read
     725          144 :             .blobs_at
     726          144 :             .as_slice()
     727          144 :             .iter()
     728          252 :             .map(|(offset, _)| *offset)
     729          144 :             .collect();
     730          144 : 
     731          144 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     732          144 :     }
     733              : 
     734              :     #[test]
     735            6 :     fn planner_chunked_coalesce_all_test() {
     736            6 :         use crate::virtual_file;
     737            6 : 
     738            6 :         const CHUNK_SIZE: u64 = 512;
     739            6 :         virtual_file::set_io_buffer_alignment(CHUNK_SIZE as usize).unwrap();
     740            6 :         let max_read_size = CHUNK_SIZE as usize * 8;
     741            6 :         let key = Key::MIN;
     742            6 :         let lsn = Lsn(0);
     743            6 : 
     744            6 :         let blob_descriptions = [
     745            6 :             (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
     746            6 :             (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
     747            6 :             (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
     748            6 :             (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
     749            6 :             (key, lsn, CHUNK_SIZE, BlobFlag::None),
     750            6 :             (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
     751            6 :             (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
     752            6 :             (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
     753            6 :             (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
     754            6 :             (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
     755            6 :             (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
     756            6 :             (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
     757            6 :             (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
     758            6 :             (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
     759            6 :         ];
     760            6 : 
     761            6 :         let ranges = [
     762            6 :             &[
     763            6 :                 blob_descriptions[0],
     764            6 :                 blob_descriptions[2],
     765            6 :                 blob_descriptions[4],
     766            6 :                 blob_descriptions[5],
     767            6 :                 blob_descriptions[7],
     768            6 :                 blob_descriptions[8],
     769            6 :                 blob_descriptions[10],
     770            6 :             ],
     771            6 :             &blob_descriptions[11..12],
     772            6 :             &blob_descriptions[13..],
     773            6 :         ];
     774            6 : 
     775            6 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     776           90 :         for (key, lsn, offset, flag) in blob_descriptions {
     777           84 :             planner.handle(key, lsn, offset, flag);
     778           84 :         }
     779              : 
     780            6 :         planner.handle_range_end(652 * 1024);
     781            6 : 
     782            6 :         let reads = planner.finish();
     783            6 : 
     784            6 :         assert_eq!(reads.len(), ranges.len());
     785              : 
     786           18 :         for (idx, read) in reads.iter().enumerate() {
     787           18 :             validate_read(read, ranges[idx]);
     788           18 :         }
     789            6 :     }
     790              : 
     791              :     #[test]
     792            6 :     fn planner_max_read_size_test() {
     793            6 :         let max_read_size = 128 * 1024;
     794            6 :         let key = Key::MIN;
     795            6 :         let lsn = Lsn(0);
     796            6 : 
     797            6 :         let blob_descriptions = vec![
     798            6 :             (key, lsn, 0, BlobFlag::None),
     799            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     800            6 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     801            6 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     802            6 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     803            6 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     804            6 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     805            6 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     806            6 :         ];
     807            6 : 
     808            6 :         let ranges = [
     809            6 :             &blob_descriptions[0..3],
     810            6 :             &blob_descriptions[3..4],
     811            6 :             &blob_descriptions[4..5],
     812            6 :             &blob_descriptions[5..6],
     813            6 :             &blob_descriptions[6..7],
     814            6 :             &blob_descriptions[7..],
     815            6 :         ];
     816            6 : 
     817            6 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     818           48 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     819           48 :             planner.handle(key, lsn, offset, flag);
     820           48 :         }
     821              : 
     822            6 :         planner.handle_range_end(652 * 1024);
     823            6 : 
     824            6 :         let reads = planner.finish();
     825            6 : 
     826            6 :         assert_eq!(reads.len(), 6);
     827              : 
     828              :         // TODO: could remove zero reads to produce 5 reads here
     829              : 
     830           36 :         for (idx, read) in reads.iter().enumerate() {
     831           36 :             validate_read(read, ranges[idx]);
     832           36 :         }
     833            6 :     }
     834              : 
     835              :     #[test]
     836            6 :     fn planner_replacement_test() {
     837            6 :         let max_read_size = 128 * 1024;
     838            6 :         let first_key = Key::MIN;
     839            6 :         let second_key = first_key.next();
     840            6 :         let lsn = Lsn(0);
     841            6 : 
     842            6 :         let blob_descriptions = vec![
     843            6 :             (first_key, lsn, 0, BlobFlag::None),    // First in read 1
     844            6 :             (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
     845            6 :             (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
     846            6 :             (second_key, lsn, 3 * 1024, BlobFlag::None),
     847            6 :             (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
     848            6 :             (second_key, lsn, 5 * 1024, BlobFlag::None),       // Last in read 2
     849            6 :         ];
     850            6 : 
     851            6 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     852            6 : 
     853            6 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     854           36 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     855           36 :             planner.handle(key, lsn, offset, flag);
     856           36 :         }
     857              : 
     858            6 :         planner.handle_range_end(6 * 1024);
     859            6 : 
     860            6 :         let reads = planner.finish();
     861            6 :         assert_eq!(reads.len(), 2);
     862              : 
     863           12 :         for (idx, read) in reads.iter().enumerate() {
     864           12 :             validate_read(read, ranges[idx]);
     865           12 :         }
     866            6 :     }
     867              : 
     868              :     #[test]
     869            6 :     fn streaming_planner_max_read_size_test() {
     870            6 :         let max_read_size = 128 * 1024;
     871            6 :         let key = Key::MIN;
     872            6 :         let lsn = Lsn(0);
     873            6 : 
     874            6 :         let blob_descriptions = vec![
     875            6 :             (key, lsn, 0, BlobFlag::None),
     876            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     877            6 :             (key, lsn, 96 * 1024, BlobFlag::None),
     878            6 :             (key, lsn, 128 * 1024, BlobFlag::None),
     879            6 :             (key, lsn, 198 * 1024, BlobFlag::None),
     880            6 :             (key, lsn, 268 * 1024, BlobFlag::None),
     881            6 :             (key, lsn, 396 * 1024, BlobFlag::None),
     882            6 :             (key, lsn, 652 * 1024, BlobFlag::None),
     883            6 :         ];
     884            6 : 
     885            6 :         let ranges = [
     886            6 :             &blob_descriptions[0..3],
     887            6 :             &blob_descriptions[3..5],
     888            6 :             &blob_descriptions[5..6],
     889            6 :             &blob_descriptions[6..7],
     890            6 :             &blob_descriptions[7..],
     891            6 :         ];
     892            6 : 
     893            6 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
     894            6 :         let mut reads = Vec::new();
     895           48 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     896           48 :             reads.extend(planner.handle(key, lsn, offset));
     897           48 :         }
     898            6 :         reads.extend(planner.handle_range_end(652 * 1024));
     899            6 : 
     900            6 :         assert_eq!(reads.len(), ranges.len());
     901              : 
     902           30 :         for (idx, read) in reads.iter().enumerate() {
     903           30 :             validate_read(read, ranges[idx]);
     904           30 :         }
     905            6 :     }
     906              : 
     907              :     #[test]
     908            6 :     fn streaming_planner_max_cnt_test() {
     909            6 :         let max_read_size = 1024 * 1024;
     910            6 :         let key = Key::MIN;
     911            6 :         let lsn = Lsn(0);
     912            6 : 
     913            6 :         let blob_descriptions = vec![
     914            6 :             (key, lsn, 0, BlobFlag::None),
     915            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     916            6 :             (key, lsn, 96 * 1024, BlobFlag::None),
     917            6 :             (key, lsn, 128 * 1024, BlobFlag::None),
     918            6 :             (key, lsn, 198 * 1024, BlobFlag::None),
     919            6 :             (key, lsn, 268 * 1024, BlobFlag::None),
     920            6 :             (key, lsn, 396 * 1024, BlobFlag::None),
     921            6 :             (key, lsn, 652 * 1024, BlobFlag::None),
     922            6 :         ];
     923            6 : 
     924            6 :         let ranges = [
     925            6 :             &blob_descriptions[0..2],
     926            6 :             &blob_descriptions[2..4],
     927            6 :             &blob_descriptions[4..6],
     928            6 :             &blob_descriptions[6..],
     929            6 :         ];
     930            6 : 
     931            6 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     932            6 :         let mut reads = Vec::new();
     933           48 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     934           48 :             reads.extend(planner.handle(key, lsn, offset));
     935           48 :         }
     936            6 :         reads.extend(planner.handle_range_end(652 * 1024));
     937            6 : 
     938            6 :         assert_eq!(reads.len(), ranges.len());
     939              : 
     940           24 :         for (idx, read) in reads.iter().enumerate() {
     941           24 :             validate_read(read, ranges[idx]);
     942           24 :         }
     943            6 :     }
     944              : 
     945              :     #[test]
     946            6 :     fn streaming_planner_edge_test() {
     947            6 :         let max_read_size = 1024 * 1024;
     948            6 :         let key = Key::MIN;
     949            6 :         let lsn = Lsn(0);
     950            6 :         {
     951            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     952            6 :             let mut reads = Vec::new();
     953            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     954            6 :             assert!(reads.is_empty());
     955              :         }
     956              :         {
     957            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     958            6 :             let mut reads = Vec::new();
     959            6 :             reads.extend(planner.handle(key, lsn, 0));
     960            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     961            6 :             assert_eq!(reads.len(), 1);
     962            6 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     963            6 :         }
     964            6 :         {
     965            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     966            6 :             let mut reads = Vec::new();
     967            6 :             reads.extend(planner.handle(key, lsn, 0));
     968            6 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     969            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     970            6 :             assert_eq!(reads.len(), 2);
     971            6 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     972            6 :             validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
     973            6 :         }
     974            6 :         {
     975            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     976            6 :             let mut reads = Vec::new();
     977            6 :             reads.extend(planner.handle(key, lsn, 0));
     978            6 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     979            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     980            6 :             assert_eq!(reads.len(), 1);
     981            6 :             validate_read(
     982            6 :                 &reads[0],
     983            6 :                 &[
     984            6 :                     (key, lsn, 0, BlobFlag::None),
     985            6 :                     (key, lsn, 128 * 1024, BlobFlag::None),
     986            6 :                 ],
     987            6 :             );
     988            6 :         }
     989            6 :     }
     990              : 
     991           24 :     async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
     992           24 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     993           24 :         let (_temp_dir, pathbuf, offsets) =
     994          837 :             write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
     995              : 
     996           24 :         let file = VirtualFile::open(&pathbuf, &ctx).await?;
     997           24 :         let file_len = std::fs::metadata(&pathbuf)?.len();
     998           24 : 
     999           24 :         // Multiply by two (compressed data might need more space), and add a few bytes for the header
    1000        12360 :         let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
    1001           24 :         let mut buf = BytesMut::with_capacity(reserved_bytes);
    1002           24 : 
    1003           24 :         let mode = VectoredReadCoalesceMode::get();
    1004           24 :         let vectored_blob_reader = VectoredBlobReader::new(&file);
    1005           24 :         let meta = BlobMeta {
    1006           24 :             key: Key::MIN,
    1007           24 :             lsn: Lsn(0),
    1008           24 :         };
    1009              : 
    1010        12360 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
    1011        12360 :             let end = offsets.get(idx + 1).unwrap_or(&file_len);
    1012        12360 :             if idx + 1 == offsets.len() {
    1013           24 :                 continue;
    1014        12336 :             }
    1015        12336 :             let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode);
    1016        12336 :             let read = read_builder.build();
    1017        12336 :             let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
    1018        12336 :             assert_eq!(result.blobs.len(), 1);
    1019        12336 :             let read_blob = &result.blobs[0];
    1020        12336 :             let read_buf = &result.buf[read_blob.start..read_blob.end];
    1021        12336 :             assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
    1022        12336 :             buf = result.buf;
    1023              :         }
    1024           24 :         Ok(())
    1025           24 :     }
    1026              : 
    1027              :     #[tokio::test]
    1028            6 :     async fn test_really_big_array() -> Result<(), Error> {
    1029            6 :         let blobs = &[
    1030            6 :             b"test".to_vec(),
    1031            6 :             random_array(10 * PAGE_SZ),
    1032            6 :             b"hello".to_vec(),
    1033            6 :             random_array(66 * PAGE_SZ),
    1034            6 :             vec![0xf3; 24 * PAGE_SZ],
    1035            6 :             b"foobar".to_vec(),
    1036            6 :         ];
    1037           42 :         round_trip_test_compressed(blobs, false).await?;
    1038           33 :         round_trip_test_compressed(blobs, true).await?;
    1039            6 :         Ok(())
    1040            6 :     }
    1041              : 
    1042              :     #[tokio::test]
    1043            6 :     async fn test_arrays_inc() -> Result<(), Error> {
    1044            6 :         let blobs = (0..PAGE_SZ / 8)
    1045         6144 :             .map(|v| random_array(v * 16))
    1046            6 :             .collect::<Vec<_>>();
    1047         3516 :         round_trip_test_compressed(&blobs, false).await?;
    1048         3516 :         round_trip_test_compressed(&blobs, true).await?;
    1049            6 :         Ok(())
    1050            6 :     }
    1051              : 
    1052              :     #[test]
    1053            6 :     fn test_div_round_up() {
    1054            6 :         const CHUNK_SIZE: usize = 512;
    1055            6 :         assert_eq!(1, div_round_up(200, CHUNK_SIZE));
    1056            6 :         assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE));
    1057            6 :         assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE));
    1058            6 :     }
    1059              : }
        

Generated by: LCOV version 2.1-beta