LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: 2a9d99866121f170b43760bd62e1e2431e597707.info Lines: 98.3 % 700 688
Test Date: 2024-09-02 14:10:37 Functions: 100.0 % 47 47

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::num::NonZeroUsize;
      20              : 
      21              : use bytes::BytesMut;
      22              : use pageserver_api::key::Key;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::BoundedBuf;
      25              : use utils::lsn::Lsn;
      26              : use utils::vec_map::VecMap;
      27              : 
      28              : use crate::context::RequestContext;
      29              : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
      30              : use crate::virtual_file::{self, VirtualFile};
      31              : 
      32              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      33              : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
      34              : 
      35              : /// Metadata bundled with the start and end offset of a blob.
      36              : #[derive(Copy, Clone, Debug)]
      37              : pub struct BlobMeta {
      38              :     pub key: Key,
      39              :     pub lsn: Lsn,
      40              : }
      41              : 
      42              : /// Blob offsets into [`VectoredBlobsBuf::buf`]
      43              : pub struct VectoredBlob {
      44              :     pub start: usize,
      45              :     pub end: usize,
      46              :     pub meta: BlobMeta,
      47              : }
      48              : 
      49              : /// Return type of [`VectoredBlobReader::read_blobs`]
      50              : pub struct VectoredBlobsBuf {
      51              :     /// Buffer for all blobs in this read
      52              :     pub buf: BytesMut,
      53              :     /// Offsets into the buffer and metadata for all blobs in this read
      54              :     pub blobs: Vec<VectoredBlob>,
      55              : }
      56              : 
      57              : /// Description of one disk read for multiple blobs.
      58              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
      59              : #[derive(Debug)]
      60              : pub struct VectoredRead {
      61              :     pub start: u64,
      62              :     pub end: u64,
      63              :     /// Start offset and metadata for each blob in this read
      64              :     pub blobs_at: VecMap<u64, BlobMeta>,
      65              : }
      66              : 
      67              : impl VectoredRead {
      68      2849955 :     pub(crate) fn size(&self) -> usize {
      69      2849955 :         (self.end - self.start) as usize
      70      2849955 :     }
      71              : }
      72              : 
      73              : #[derive(Eq, PartialEq, Debug)]
      74              : pub(crate) enum VectoredReadExtended {
      75              :     Yes,
      76              :     No,
      77              : }
      78              : 
      79              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      80              : pub enum VectoredReadCoalesceMode {
      81              :     /// Only coalesce exactly adjacent reads.
      82              :     AdjacentOnly,
      83              :     /// In addition to adjacent reads, also consider reads whose corresponding
      84              :     /// `end` and `start` offsets reside at the same chunk.
      85              :     Chunked(usize),
      86              : }
      87              : 
      88              : impl VectoredReadCoalesceMode {
      89              :     /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0,
      90              :     /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher.
      91       641733 :     pub(crate) fn get() -> Self {
      92       641733 :         let align = virtual_file::get_io_buffer_alignment_raw();
      93       641733 :         if align == 0 {
      94       213920 :             VectoredReadCoalesceMode::AdjacentOnly
      95              :         } else {
      96       427813 :             VectoredReadCoalesceMode::Chunked(align)
      97              :         }
      98       641733 :     }
      99              : }
     100              : 
     101              : pub(crate) enum VectoredReadBuilder {
     102              :     Adjacent(AdjacentVectoredReadBuilder),
     103              :     Chunked(ChunkedVectoredReadBuilder),
     104              : }
     105              : 
     106              : impl VectoredReadBuilder {
     107       624115 :     fn new_impl(
     108       624115 :         start_offset: u64,
     109       624115 :         end_offset: u64,
     110       624115 :         meta: BlobMeta,
     111       624115 :         max_read_size: Option<usize>,
     112       624115 :         mode: VectoredReadCoalesceMode,
     113       624115 :     ) -> Self {
     114       624115 :         match mode {
     115       213135 :             VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent(
     116       213135 :                 AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size),
     117       213135 :             ),
     118       410980 :             VectoredReadCoalesceMode::Chunked(chunk_size) => {
     119       410980 :                 Self::Chunked(ChunkedVectoredReadBuilder::new(
     120       410980 :                     start_offset,
     121       410980 :                     end_offset,
     122       410980 :                     meta,
     123       410980 :                     max_read_size,
     124       410980 :                     chunk_size,
     125       410980 :                 ))
     126              :             }
     127              :         }
     128       624115 :     }
     129              : 
     130       503185 :     pub(crate) fn new(
     131       503185 :         start_offset: u64,
     132       503185 :         end_offset: u64,
     133       503185 :         meta: BlobMeta,
     134       503185 :         max_read_size: usize,
     135       503185 :         mode: VectoredReadCoalesceMode,
     136       503185 :     ) -> Self {
     137       503185 :         Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode)
     138       503185 :     }
     139              : 
     140       120930 :     pub(crate) fn new_streaming(
     141       120930 :         start_offset: u64,
     142       120930 :         end_offset: u64,
     143       120930 :         meta: BlobMeta,
     144       120930 :         mode: VectoredReadCoalesceMode,
     145       120930 :     ) -> Self {
     146       120930 :         Self::new_impl(start_offset, end_offset, meta, None, mode)
     147       120930 :     }
     148              : 
     149      7333970 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     150      7333970 :         match self {
     151      2444738 :             VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta),
     152      4889232 :             VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta),
     153              :         }
     154      7333970 :     }
     155              : 
     156       624115 :     pub(crate) fn build(self) -> VectoredRead {
     157       624115 :         match self {
     158       213135 :             VectoredReadBuilder::Adjacent(builder) => builder.build(),
     159       410980 :             VectoredReadBuilder::Chunked(builder) => builder.build(),
     160              :         }
     161       624115 :     }
     162              : 
     163      6380026 :     pub(crate) fn size(&self) -> usize {
     164      6380026 :         match self {
     165      2126676 :             VectoredReadBuilder::Adjacent(builder) => builder.size(),
     166      4253350 :             VectoredReadBuilder::Chunked(builder) => builder.size(),
     167              :         }
     168      6380026 :     }
     169              : }
     170              : 
     171              : pub(crate) struct AdjacentVectoredReadBuilder {
     172              :     /// Start offset of the read.
     173              :     start: u64,
     174              :     // End offset of the read.
     175              :     end: u64,
     176              :     /// Start offset and metadata for each blob in this read
     177              :     blobs_at: VecMap<u64, BlobMeta>,
     178              :     max_read_size: Option<usize>,
     179              : }
     180              : 
     181              : impl AdjacentVectoredReadBuilder {
     182              :     /// Start building a new vectored read.
     183              :     ///
     184              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     185              :     /// support reading larger blobs than the configuration value. The builder will be single use
     186              :     /// however after that.
     187       213135 :     pub(crate) fn new(
     188       213135 :         start_offset: u64,
     189       213135 :         end_offset: u64,
     190       213135 :         meta: BlobMeta,
     191       213135 :         max_read_size: Option<usize>,
     192       213135 :     ) -> Self {
     193       213135 :         let mut blobs_at = VecMap::default();
     194       213135 :         blobs_at
     195       213135 :             .append(start_offset, meta)
     196       213135 :             .expect("First insertion always succeeds");
     197       213135 : 
     198       213135 :         Self {
     199       213135 :             start: start_offset,
     200       213135 :             end: end_offset,
     201       213135 :             blobs_at,
     202       213135 :             max_read_size,
     203       213135 :         }
     204       213135 :     }
     205              :     /// Attempt to extend the current read with a new blob if the start
     206              :     /// offset matches with the current end of the vectored read
     207              :     /// and the resuting size is below the max read size
     208      2444738 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     209      2444738 :         tracing::trace!(start, end, "trying to extend");
     210      2444738 :         let size = (end - start) as usize;
     211      2444738 :         let not_limited_by_max_read_size = {
     212      2444738 :             if let Some(max_read_size) = self.max_read_size {
     213       357710 :                 self.size() + size <= max_read_size
     214              :             } else {
     215      2087028 :                 true
     216              :             }
     217              :         };
     218              : 
     219      2444738 :         if self.end == start && not_limited_by_max_read_size {
     220      2406422 :             self.end = end;
     221      2406422 :             self.blobs_at
     222      2406422 :                 .append(start, meta)
     223      2406422 :                 .expect("LSNs are ordered within vectored reads");
     224      2406422 : 
     225      2406422 :             return VectoredReadExtended::Yes;
     226        38316 :         }
     227        38316 : 
     228        38316 :         VectoredReadExtended::No
     229      2444738 :     }
     230              : 
     231      2484386 :     pub(crate) fn size(&self) -> usize {
     232      2484386 :         (self.end - self.start) as usize
     233      2484386 :     }
     234              : 
     235       213135 :     pub(crate) fn build(self) -> VectoredRead {
     236       213135 :         VectoredRead {
     237       213135 :             start: self.start,
     238       213135 :             end: self.end,
     239       213135 :             blobs_at: self.blobs_at,
     240       213135 :         }
     241       213135 :     }
     242              : }
     243              : 
     244              : pub(crate) struct ChunkedVectoredReadBuilder {
     245              :     /// Start block number
     246              :     start_blk_no: usize,
     247              :     /// End block number (exclusive).
     248              :     end_blk_no: usize,
     249              :     /// Start offset and metadata for each blob in this read
     250              :     blobs_at: VecMap<u64, BlobMeta>,
     251              :     max_read_size: Option<usize>,
     252              :     /// Chunk size reads are coalesced into.
     253              :     chunk_size: usize,
     254              : }
     255              : 
     256              : /// Computes x / d rounded up.
     257      5300230 : fn div_round_up(x: usize, d: usize) -> usize {
     258      5300230 :     (x + (d - 1)) / d
     259      5300230 : }
     260              : 
     261              : impl ChunkedVectoredReadBuilder {
     262              :     /// Start building a new vectored read.
     263              :     ///
     264              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     265              :     /// support reading larger blobs than the configuration value. The builder will be single use
     266              :     /// however after that.
     267       410980 :     pub(crate) fn new(
     268       410980 :         start_offset: u64,
     269       410980 :         end_offset: u64,
     270       410980 :         meta: BlobMeta,
     271       410980 :         max_read_size: Option<usize>,
     272       410980 :         chunk_size: usize,
     273       410980 :     ) -> Self {
     274       410980 :         let mut blobs_at = VecMap::default();
     275       410980 :         blobs_at
     276       410980 :             .append(start_offset, meta)
     277       410980 :             .expect("First insertion always succeeds");
     278       410980 : 
     279       410980 :         let start_blk_no = start_offset as usize / chunk_size;
     280       410980 :         let end_blk_no = div_round_up(end_offset as usize, chunk_size);
     281       410980 :         Self {
     282       410980 :             start_blk_no,
     283       410980 :             end_blk_no,
     284       410980 :             blobs_at,
     285       410980 :             max_read_size,
     286       410980 :             chunk_size,
     287       410980 :         }
     288       410980 :     }
     289              : 
     290              :     /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
     291              :     ///
     292              :     /// The resulting size also must be below the max read size.
     293      4889232 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     294      4889232 :         tracing::trace!(start, end, "trying to extend");
     295      4889232 :         let start_blk_no = start as usize / self.chunk_size;
     296      4889232 :         let end_blk_no = div_round_up(end as usize, self.chunk_size);
     297              : 
     298      4889232 :         let not_limited_by_max_read_size = {
     299      4889232 :             if let Some(max_read_size) = self.max_read_size {
     300       715436 :                 let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
     301       715436 :                 coalesced_size <= max_read_size
     302              :             } else {
     303      4173796 :                 true
     304              :             }
     305              :         };
     306              : 
     307              :         // True if the second block starts in the same block or the immediate next block where the first block ended.
     308              :         //
     309              :         // Note: This automatically handles the case where two blocks are adjacent to each other,
     310              :         // whether they starts on chunk size boundary or not.
     311      4889232 :         let is_adjacent_chunk_read = {
     312              :             // 1. first.end & second.start are in the same block
     313      4889232 :             self.end_blk_no == start_blk_no + 1 ||
     314              :             // 2. first.end ends one block before second.start
     315      2457497 :             self.end_blk_no == start_blk_no
     316              :         };
     317              : 
     318      4889232 :         if is_adjacent_chunk_read && not_limited_by_max_read_size {
     319      4827378 :             self.end_blk_no = end_blk_no;
     320      4827378 :             self.blobs_at
     321      4827378 :                 .append(start, meta)
     322      4827378 :                 .expect("LSNs are ordered within vectored reads");
     323      4827378 : 
     324      4827378 :             return VectoredReadExtended::Yes;
     325        61854 :         }
     326        61854 : 
     327        61854 :         VectoredReadExtended::No
     328      4889232 :     }
     329              : 
     330      4253350 :     pub(crate) fn size(&self) -> usize {
     331      4253350 :         (self.end_blk_no - self.start_blk_no) * self.chunk_size
     332      4253350 :     }
     333              : 
     334       410980 :     pub(crate) fn build(self) -> VectoredRead {
     335       410980 :         let start = (self.start_blk_no * self.chunk_size) as u64;
     336       410980 :         let end = (self.end_blk_no * self.chunk_size) as u64;
     337       410980 :         VectoredRead {
     338       410980 :             start,
     339       410980 :             end,
     340       410980 :             blobs_at: self.blobs_at,
     341       410980 :         }
     342       410980 :     }
     343              : }
     344              : 
     345              : #[derive(Copy, Clone, Debug)]
     346              : pub enum BlobFlag {
     347              :     None,
     348              :     Ignore,
     349              :     ReplaceAll,
     350              : }
     351              : 
     352              : /// Planner for vectored blob reads.
     353              : ///
     354              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     355              : /// and coalesced into disk reads.
     356              : ///
     357              : /// The implementation is very simple:
     358              : /// * Collect all blob offsets in an ordered structure
     359              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     360              : pub struct VectoredReadPlanner {
     361              :     // Track all the blob offsets. Start offsets must be ordered.
     362              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
     363              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     364              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     365              : 
     366              :     max_read_size: usize,
     367              : 
     368              :     mode: VectoredReadCoalesceMode,
     369              : }
     370              : 
     371              : impl VectoredReadPlanner {
     372       639441 :     pub fn new(max_read_size: usize) -> Self {
     373       639441 :         let mode = VectoredReadCoalesceMode::get();
     374       639441 :         Self {
     375       639441 :             blobs: BTreeMap::new(),
     376       639441 :             prev: None,
     377       639441 :             max_read_size,
     378       639441 :             mode,
     379       639441 :         }
     380       639441 :     }
     381              : 
     382              :     /// Include a new blob in the read plan.
     383              :     ///
     384              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     385              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     386              :     /// keys in a given keyspace. This function must be called for each key in the desired
     387              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     388              :     /// be called after every range in the offset.
     389              :     ///
     390              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     391              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     392              :     /// incorrect data to the user.
     393              :     ///
     394              :     /// The `flag` argument has two interesting values:
     395              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     396              :     ///   This is used for WAL records that `will_init`.
     397              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     398              :     ///   if the blob is cached.
     399      4238202 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     400              :         // Implementation note: internally lag behind by one blob such that
     401              :         // we have a start and end offset when initialising [`VectoredRead`]
     402      4238202 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     403              :             None => {
     404       411411 :                 self.prev = Some((key, lsn, offset, flag));
     405       411411 :                 return;
     406              :             }
     407      3826791 :             Some(prev) => prev,
     408      3826791 :         };
     409      3826791 : 
     410      3826791 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     411      3826791 : 
     412      3826791 :         self.prev = Some((key, lsn, offset, flag));
     413      4238202 :     }
     414              : 
     415       831088 :     pub fn handle_range_end(&mut self, offset: u64) {
     416       831088 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     417       411411 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     418       419677 :         }
     419              : 
     420       831088 :         self.prev = None;
     421       831088 :     }
     422              : 
     423      4238202 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     424      4238202 :         match flag {
     425      1015008 :             BlobFlag::None => {
     426      1015008 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     427      1015008 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     428      1015008 :             }
     429       770482 :             BlobFlag::ReplaceAll => {
     430       770482 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     431       770482 :                 blobs_for_key.clear();
     432       770482 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     433       770482 :             }
     434      2452712 :             BlobFlag::Ignore => {}
     435              :         }
     436      4238202 :     }
     437              : 
     438       639441 :     pub fn finish(self) -> Vec<VectoredRead> {
     439       639441 :         let mut current_read_builder: Option<VectoredReadBuilder> = None;
     440       639441 :         let mut reads = Vec::new();
     441              : 
     442      1949746 :         for (key, blobs_for_key) in self.blobs {
     443      2774034 :             for (lsn, start_offset, end_offset) in blobs_for_key {
     444      1463729 :                 let extended = match &mut current_read_builder {
     445      1073080 :                     Some(read_builder) => {
     446      1073080 :                         read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
     447              :                     }
     448       390649 :                     None => VectoredReadExtended::No,
     449              :                 };
     450              : 
     451      1463729 :                 if extended == VectoredReadExtended::No {
     452       490801 :                     let next_read_builder = VectoredReadBuilder::new(
     453       490801 :                         start_offset,
     454       490801 :                         end_offset,
     455       490801 :                         BlobMeta { key, lsn },
     456       490801 :                         self.max_read_size,
     457       490801 :                         self.mode,
     458       490801 :                     );
     459       490801 : 
     460       490801 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     461              : 
     462              :                     // `current_read_builder` is None in the first iteration of the outer loop
     463       490801 :                     if let Some(read_builder) = prev_read_builder {
     464       100152 :                         reads.push(read_builder.build());
     465       390649 :                     }
     466       972928 :                 }
     467              :             }
     468              :         }
     469              : 
     470       639441 :         if let Some(read_builder) = current_read_builder {
     471       390649 :             reads.push(read_builder.build());
     472       390649 :         }
     473              : 
     474       639441 :         reads
     475       639441 :     }
     476              : }
     477              : 
     478              : /// Disk reader for vectored blob spans (does not go through the page cache)
     479              : pub struct VectoredBlobReader<'a> {
     480              :     file: &'a VirtualFile,
     481              : }
     482              : 
     483              : impl<'a> VectoredBlobReader<'a> {
     484       760345 :     pub fn new(file: &'a VirtualFile) -> Self {
     485       760345 :         Self { file }
     486       760345 :     }
     487              : 
     488              :     /// Read the requested blobs into the buffer.
     489              :     ///
     490              :     /// We have to deal with the fact that blobs are not fixed size.
     491              :     /// Each blob is prefixed by a size header.
     492              :     ///
     493              :     /// The success return value is a struct which contains the buffer
     494              :     /// filled from disk and a list of offsets at which each blob lies
     495              :     /// in the buffer.
     496       623943 :     pub async fn read_blobs(
     497       623943 :         &self,
     498       623943 :         read: &VectoredRead,
     499       623943 :         buf: BytesMut,
     500       623943 :         ctx: &RequestContext,
     501       623943 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     502       623943 :         assert!(read.size() > 0);
     503       623943 :         assert!(
     504       623943 :             read.size() <= buf.capacity(),
     505            0 :             "{} > {}",
     506            0 :             read.size(),
     507            0 :             buf.capacity()
     508              :         );
     509              : 
     510       623943 :         if cfg!(debug_assertions) {
     511       623943 :             let align = virtual_file::get_io_buffer_alignment() as u64;
     512       623943 :             debug_assert_eq!(
     513       623943 :                 read.start % align,
     514              :                 0,
     515            0 :                 "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
     516              :                 read.start,
     517              :                 align
     518              :             );
     519            0 :         }
     520              : 
     521       623943 :         let mut buf = self
     522       623943 :             .file
     523       623943 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     524       317305 :             .await?
     525       623943 :             .into_inner();
     526       623943 : 
     527       623943 :         let blobs_at = read.blobs_at.as_slice();
     528       623943 : 
     529       623943 :         let start_offset = read.start;
     530       623943 : 
     531       623943 :         let mut metas = Vec::with_capacity(blobs_at.len());
     532       623943 :         // Blobs in `read` only provide their starting offset. The end offset
     533       623943 :         // of a blob is implicit: the start of the next blob if one exists
     534       623943 :         // or the end of the read.
     535       623943 : 
     536       623943 :         // Some scratch space, put here for reusing the allocation
     537       623943 :         let mut decompressed_vec = Vec::new();
     538              : 
     539      8481558 :         for (blob_start, meta) in blobs_at {
     540      7857615 :             let blob_start_in_buf = blob_start - start_offset;
     541      7857615 :             let first_len_byte = buf[blob_start_in_buf as usize];
     542              : 
     543              :             // Each blob is prefixed by a header containing its size and compression information.
     544              :             // Extract the size and skip that header to find the start of the data.
     545              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     546              :             // 1 byte case and 1 in the 4 byte case.
     547      7857615 :             let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
     548      7744539 :                 (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
     549              :             } else {
     550       113076 :                 let mut blob_size_buf = [0u8; 4];
     551       113076 :                 let offset_in_buf = blob_start_in_buf as usize;
     552       113076 : 
     553       113076 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     554       113076 :                 blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
     555       113076 : 
     556       113076 :                 let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     557       113076 :                 (
     558       113076 :                     4,
     559       113076 :                     u32::from_be_bytes(blob_size_buf) as u64,
     560       113076 :                     compression_bits,
     561       113076 :                 )
     562              :             };
     563              : 
     564      7857615 :             let start_raw = blob_start_in_buf + size_length;
     565      7857615 :             let end_raw = start_raw + blob_size;
     566      7857615 :             let (start, end);
     567      7857615 :             if compression_bits == BYTE_UNCOMPRESSED {
     568      7857609 :                 start = start_raw as usize;
     569      7857609 :                 end = end_raw as usize;
     570      7857609 :             } else if compression_bits == BYTE_ZSTD {
     571            6 :                 let mut decoder =
     572            6 :                     async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
     573            6 :                 decoder
     574            6 :                     .write_all(&buf[start_raw as usize..end_raw as usize])
     575            0 :                     .await?;
     576            6 :                 decoder.flush().await?;
     577            6 :                 start = buf.len();
     578            6 :                 buf.extend_from_slice(&decompressed_vec);
     579            6 :                 end = buf.len();
     580            6 :                 decompressed_vec.clear();
     581              :             } else {
     582            0 :                 let error = std::io::Error::new(
     583            0 :                     std::io::ErrorKind::InvalidData,
     584            0 :                     format!("invalid compression byte {compression_bits:x}"),
     585            0 :                 );
     586            0 :                 return Err(error);
     587              :             }
     588              : 
     589      7857615 :             metas.push(VectoredBlob {
     590      7857615 :                 start,
     591      7857615 :                 end,
     592      7857615 :                 meta: *meta,
     593      7857615 :             });
     594              :         }
     595              : 
     596       623943 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     597       623943 :     }
     598              : }
     599              : 
     600              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
     601              : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
     602              : /// max_cnt constraints.
     603              : pub struct StreamingVectoredReadPlanner {
     604              :     read_builder: Option<VectoredReadBuilder>,
     605              :     // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
     606              :     prev: Option<(Key, Lsn, u64)>,
     607              :     /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
     608              :     /// we will produce a single batch instead of split them.
     609              :     max_read_size: u64,
     610              :     /// Max item count per batch
     611              :     max_cnt: usize,
     612              :     /// Size of the current batch
     613              :     cnt: usize,
     614              : 
     615              :     mode: VectoredReadCoalesceMode,
     616              : }
     617              : 
     618              : impl StreamingVectoredReadPlanner {
     619         2238 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     620         2238 :         assert!(max_cnt > 0);
     621         2238 :         assert!(max_read_size > 0);
     622         2238 :         let mode = VectoredReadCoalesceMode::get();
     623         2238 :         Self {
     624         2238 :             read_builder: None,
     625         2238 :             prev: None,
     626         2238 :             max_cnt,
     627         2238 :             max_read_size,
     628         2238 :             cnt: 0,
     629         2238 :             mode,
     630         2238 :         }
     631         2238 :     }
     632              : 
     633      6381922 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
     634              :         // Implementation note: internally lag behind by one blob such that
     635              :         // we have a start and end offset when initialising [`VectoredRead`]
     636      6381922 :         let (prev_key, prev_lsn, prev_offset) = match self.prev {
     637              :             None => {
     638         1896 :                 self.prev = Some((key, lsn, offset));
     639         1896 :                 return None;
     640              :             }
     641      6380026 :             Some(prev) => prev,
     642      6380026 :         };
     643      6380026 : 
     644      6380026 :         let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
     645      6380026 : 
     646      6380026 :         self.prev = Some((key, lsn, offset));
     647      6380026 : 
     648      6380026 :         res
     649      6381922 :     }
     650              : 
     651         1734 :     pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
     652         1734 :         let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
     653         1728 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
     654              :         } else {
     655            6 :             None
     656              :         };
     657              : 
     658         1734 :         self.prev = None;
     659         1734 : 
     660         1734 :         res
     661         1734 :     }
     662              : 
     663      6381754 :     fn add_blob(
     664      6381754 :         &mut self,
     665      6381754 :         key: Key,
     666      6381754 :         lsn: Lsn,
     667      6381754 :         start_offset: u64,
     668      6381754 :         end_offset: u64,
     669      6381754 :         is_last_blob_in_read: bool,
     670      6381754 :     ) -> Option<VectoredRead> {
     671      6381754 :         match &mut self.read_builder {
     672      6260824 :             Some(read_builder) => {
     673      6260824 :                 let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
     674      6260824 :                 assert_eq!(extended, VectoredReadExtended::Yes);
     675              :             }
     676       120930 :             None => {
     677       120930 :                 self.read_builder = {
     678       120930 :                     Some(VectoredReadBuilder::new_streaming(
     679       120930 :                         start_offset,
     680       120930 :                         end_offset,
     681       120930 :                         BlobMeta { key, lsn },
     682       120930 :                         self.mode,
     683       120930 :                     ))
     684       120930 :                 };
     685       120930 :             }
     686              :         }
     687      6381754 :         let read_builder = self.read_builder.as_mut().unwrap();
     688      6381754 :         self.cnt += 1;
     689      6381754 :         if is_last_blob_in_read
     690      6380026 :             || read_builder.size() >= self.max_read_size as usize
     691      6295054 :             || self.cnt >= self.max_cnt
     692              :         {
     693       120930 :             let prev_read_builder = self.read_builder.take();
     694       120930 :             self.cnt = 0;
     695              : 
     696              :             // `current_read_builder` is None in the first iteration
     697       120930 :             if let Some(read_builder) = prev_read_builder {
     698       120930 :                 return Some(read_builder.build());
     699            0 :             }
     700      6260824 :         }
     701      6260824 :         None
     702      6381754 :     }
     703              : }
     704              : 
     705              : #[cfg(test)]
     706              : mod tests {
     707              :     use anyhow::Error;
     708              : 
     709              :     use crate::context::DownloadBehavior;
     710              :     use crate::page_cache::PAGE_SZ;
     711              :     use crate::task_mgr::TaskKind;
     712              : 
     713              :     use super::super::blob_io::tests::{random_array, write_maybe_compressed};
     714              :     use super::*;
     715              : 
     716          132 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     717          132 :         let align = virtual_file::get_io_buffer_alignment() as u64;
     718          132 :         assert_eq!(read.start % align, 0);
     719          132 :         assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
     720              : 
     721          216 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     722          132 : 
     723          132 :         let offsets_in_read: Vec<_> = read
     724          132 :             .blobs_at
     725          132 :             .as_slice()
     726          132 :             .iter()
     727          216 :             .map(|(offset, _)| *offset)
     728          132 :             .collect();
     729          132 : 
     730          132 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     731          132 :     }
     732              : 
     733              :     #[test]
     734            6 :     fn planner_chunked_coalesce_all_test() {
     735            6 :         use crate::virtual_file;
     736            6 : 
     737            6 :         let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
     738            6 : 
     739            6 :         // The test explicitly does not check chunk size < 512
     740            6 :         if chunk_size < 512 {
     741            4 :             return;
     742            2 :         }
     743            2 : 
     744            2 :         let max_read_size = chunk_size as usize * 8;
     745            2 :         let key = Key::MIN;
     746            2 :         let lsn = Lsn(0);
     747            2 : 
     748            2 :         let blob_descriptions = [
     749            2 :             (key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
     750            2 :             (key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
     751            2 :             (key, lsn, chunk_size / 2, BlobFlag::None),
     752            2 :             (key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
     753            2 :             (key, lsn, chunk_size, BlobFlag::None),
     754            2 :             (key, lsn, chunk_size * 2 - 1, BlobFlag::None),
     755            2 :             (key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
     756            2 :             (key, lsn, chunk_size * 3 + 1, BlobFlag::None),
     757            2 :             (key, lsn, chunk_size * 5 + 1, BlobFlag::None),
     758            2 :             (key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
     759            2 :             (key, lsn, chunk_size * 7 + 1, BlobFlag::None),
     760            2 :             (key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
     761            2 :             (key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
     762            2 :             (key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
     763            2 :         ];
     764            2 : 
     765            2 :         let ranges = [
     766            2 :             &[
     767            2 :                 blob_descriptions[0],
     768            2 :                 blob_descriptions[2],
     769            2 :                 blob_descriptions[4],
     770            2 :                 blob_descriptions[5],
     771            2 :                 blob_descriptions[7],
     772            2 :                 blob_descriptions[8],
     773            2 :                 blob_descriptions[10],
     774            2 :             ],
     775            2 :             &blob_descriptions[11..12],
     776            2 :             &blob_descriptions[13..],
     777            2 :         ];
     778            2 : 
     779            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     780           30 :         for (key, lsn, offset, flag) in blob_descriptions {
     781           28 :             planner.handle(key, lsn, offset, flag);
     782           28 :         }
     783              : 
     784            2 :         planner.handle_range_end(652 * 1024);
     785            2 : 
     786            2 :         let reads = planner.finish();
     787            2 : 
     788            2 :         assert_eq!(reads.len(), ranges.len());
     789              : 
     790            6 :         for (idx, read) in reads.iter().enumerate() {
     791            6 :             validate_read(read, ranges[idx]);
     792            6 :         }
     793            6 :     }
     794              : 
     795              :     #[test]
     796            6 :     fn planner_max_read_size_test() {
     797            6 :         let max_read_size = 128 * 1024;
     798            6 :         let key = Key::MIN;
     799            6 :         let lsn = Lsn(0);
     800            6 : 
     801            6 :         let blob_descriptions = vec![
     802            6 :             (key, lsn, 0, BlobFlag::None),
     803            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     804            6 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     805            6 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     806            6 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     807            6 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     808            6 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     809            6 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     810            6 :         ];
     811            6 : 
     812            6 :         let ranges = [
     813            6 :             &blob_descriptions[0..3],
     814            6 :             &blob_descriptions[3..4],
     815            6 :             &blob_descriptions[4..5],
     816            6 :             &blob_descriptions[5..6],
     817            6 :             &blob_descriptions[6..7],
     818            6 :             &blob_descriptions[7..],
     819            6 :         ];
     820            6 : 
     821            6 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     822           48 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     823           48 :             planner.handle(key, lsn, offset, flag);
     824           48 :         }
     825              : 
     826            6 :         planner.handle_range_end(652 * 1024);
     827            6 : 
     828            6 :         let reads = planner.finish();
     829            6 : 
     830            6 :         assert_eq!(reads.len(), 6);
     831              : 
     832              :         // TODO: could remove zero reads to produce 5 reads here
     833              : 
     834           36 :         for (idx, read) in reads.iter().enumerate() {
     835           36 :             validate_read(read, ranges[idx]);
     836           36 :         }
     837            6 :     }
     838              : 
     839              :     #[test]
     840            6 :     fn planner_replacement_test() {
     841            6 :         let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
     842            6 :         let max_read_size = 128 * chunk_size as usize;
     843            6 :         let first_key = Key::MIN;
     844            6 :         let second_key = first_key.next();
     845            6 :         let lsn = Lsn(0);
     846            6 : 
     847            6 :         let blob_descriptions = vec![
     848            6 :             (first_key, lsn, 0, BlobFlag::None),          // First in read 1
     849            6 :             (first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
     850            6 :             (second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
     851            6 :             (second_key, lsn, 3 * chunk_size, BlobFlag::None),
     852            6 :             (second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
     853            6 :             (second_key, lsn, 5 * chunk_size, BlobFlag::None),       // Last in read 2
     854            6 :         ];
     855            6 : 
     856            6 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     857            6 : 
     858            6 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     859           36 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     860           36 :             planner.handle(key, lsn, offset, flag);
     861           36 :         }
     862              : 
     863            6 :         planner.handle_range_end(6 * chunk_size);
     864            6 : 
     865            6 :         let reads = planner.finish();
     866            6 :         assert_eq!(reads.len(), 2);
     867              : 
     868           12 :         for (idx, read) in reads.iter().enumerate() {
     869           12 :             validate_read(read, ranges[idx]);
     870           12 :         }
     871            6 :     }
     872              : 
     873              :     #[test]
     874            6 :     fn streaming_planner_max_read_size_test() {
     875            6 :         let max_read_size = 128 * 1024;
     876            6 :         let key = Key::MIN;
     877            6 :         let lsn = Lsn(0);
     878            6 : 
     879            6 :         let blob_descriptions = vec![
     880            6 :             (key, lsn, 0, BlobFlag::None),
     881            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     882            6 :             (key, lsn, 96 * 1024, BlobFlag::None),
     883            6 :             (key, lsn, 128 * 1024, BlobFlag::None),
     884            6 :             (key, lsn, 198 * 1024, BlobFlag::None),
     885            6 :             (key, lsn, 268 * 1024, BlobFlag::None),
     886            6 :             (key, lsn, 396 * 1024, BlobFlag::None),
     887            6 :             (key, lsn, 652 * 1024, BlobFlag::None),
     888            6 :         ];
     889            6 : 
     890            6 :         let ranges = [
     891            6 :             &blob_descriptions[0..3],
     892            6 :             &blob_descriptions[3..5],
     893            6 :             &blob_descriptions[5..6],
     894            6 :             &blob_descriptions[6..7],
     895            6 :             &blob_descriptions[7..],
     896            6 :         ];
     897            6 : 
     898            6 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
     899            6 :         let mut reads = Vec::new();
     900           48 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     901           48 :             reads.extend(planner.handle(key, lsn, offset));
     902           48 :         }
     903            6 :         reads.extend(planner.handle_range_end(652 * 1024));
     904            6 : 
     905            6 :         assert_eq!(reads.len(), ranges.len());
     906              : 
     907           30 :         for (idx, read) in reads.iter().enumerate() {
     908           30 :             validate_read(read, ranges[idx]);
     909           30 :         }
     910            6 :     }
     911              : 
     912              :     #[test]
     913            6 :     fn streaming_planner_max_cnt_test() {
     914            6 :         let max_read_size = 1024 * 1024;
     915            6 :         let key = Key::MIN;
     916            6 :         let lsn = Lsn(0);
     917            6 : 
     918            6 :         let blob_descriptions = vec![
     919            6 :             (key, lsn, 0, BlobFlag::None),
     920            6 :             (key, lsn, 32 * 1024, BlobFlag::None),
     921            6 :             (key, lsn, 96 * 1024, BlobFlag::None),
     922            6 :             (key, lsn, 128 * 1024, BlobFlag::None),
     923            6 :             (key, lsn, 198 * 1024, BlobFlag::None),
     924            6 :             (key, lsn, 268 * 1024, BlobFlag::None),
     925            6 :             (key, lsn, 396 * 1024, BlobFlag::None),
     926            6 :             (key, lsn, 652 * 1024, BlobFlag::None),
     927            6 :         ];
     928            6 : 
     929            6 :         let ranges = [
     930            6 :             &blob_descriptions[0..2],
     931            6 :             &blob_descriptions[2..4],
     932            6 :             &blob_descriptions[4..6],
     933            6 :             &blob_descriptions[6..],
     934            6 :         ];
     935            6 : 
     936            6 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     937            6 :         let mut reads = Vec::new();
     938           48 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     939           48 :             reads.extend(planner.handle(key, lsn, offset));
     940           48 :         }
     941            6 :         reads.extend(planner.handle_range_end(652 * 1024));
     942            6 : 
     943            6 :         assert_eq!(reads.len(), ranges.len());
     944              : 
     945           24 :         for (idx, read) in reads.iter().enumerate() {
     946           24 :             validate_read(read, ranges[idx]);
     947           24 :         }
     948            6 :     }
     949              : 
     950              :     #[test]
     951            6 :     fn streaming_planner_edge_test() {
     952            6 :         let max_read_size = 1024 * 1024;
     953            6 :         let key = Key::MIN;
     954            6 :         let lsn = Lsn(0);
     955            6 :         {
     956            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     957            6 :             let mut reads = Vec::new();
     958            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     959            6 :             assert!(reads.is_empty());
     960              :         }
     961              :         {
     962            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     963            6 :             let mut reads = Vec::new();
     964            6 :             reads.extend(planner.handle(key, lsn, 0));
     965            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     966            6 :             assert_eq!(reads.len(), 1);
     967            6 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     968            6 :         }
     969            6 :         {
     970            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     971            6 :             let mut reads = Vec::new();
     972            6 :             reads.extend(planner.handle(key, lsn, 0));
     973            6 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     974            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     975            6 :             assert_eq!(reads.len(), 2);
     976            6 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     977            6 :             validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
     978            6 :         }
     979            6 :         {
     980            6 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     981            6 :             let mut reads = Vec::new();
     982            6 :             reads.extend(planner.handle(key, lsn, 0));
     983            6 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     984            6 :             reads.extend(planner.handle_range_end(652 * 1024));
     985            6 :             assert_eq!(reads.len(), 1);
     986            6 :             validate_read(
     987            6 :                 &reads[0],
     988            6 :                 &[
     989            6 :                     (key, lsn, 0, BlobFlag::None),
     990            6 :                     (key, lsn, 128 * 1024, BlobFlag::None),
     991            6 :                 ],
     992            6 :             );
     993            6 :         }
     994            6 :     }
     995              : 
     996           24 :     async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
     997           24 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     998           24 :         let (_temp_dir, pathbuf, offsets) =
     999          837 :             write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
    1000              : 
    1001           24 :         let file = VirtualFile::open(&pathbuf, &ctx).await?;
    1002           24 :         let file_len = std::fs::metadata(&pathbuf)?.len();
    1003           24 : 
    1004           24 :         // Multiply by two (compressed data might need more space), and add a few bytes for the header
    1005        12360 :         let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
    1006           24 :         let mut buf = BytesMut::with_capacity(reserved_bytes);
    1007           24 : 
    1008           24 :         let mode = VectoredReadCoalesceMode::get();
    1009           24 :         let vectored_blob_reader = VectoredBlobReader::new(&file);
    1010           24 :         let meta = BlobMeta {
    1011           24 :             key: Key::MIN,
    1012           24 :             lsn: Lsn(0),
    1013           24 :         };
    1014              : 
    1015        12360 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
    1016        12360 :             let end = offsets.get(idx + 1).unwrap_or(&file_len);
    1017        12360 :             if idx + 1 == offsets.len() {
    1018           24 :                 continue;
    1019        12336 :             }
    1020        12336 :             let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode);
    1021        12336 :             let read = read_builder.build();
    1022        12336 :             let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
    1023        12336 :             assert_eq!(result.blobs.len(), 1);
    1024        12336 :             let read_blob = &result.blobs[0];
    1025        12336 :             let read_buf = &result.buf[read_blob.start..read_blob.end];
    1026        12336 :             assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
    1027        12336 :             buf = result.buf;
    1028              :         }
    1029           24 :         Ok(())
    1030           24 :     }
    1031              : 
    1032              :     #[tokio::test]
    1033            6 :     async fn test_really_big_array() -> Result<(), Error> {
    1034            6 :         let blobs = &[
    1035            6 :             b"test".to_vec(),
    1036            6 :             random_array(10 * PAGE_SZ),
    1037            6 :             b"hello".to_vec(),
    1038            6 :             random_array(66 * PAGE_SZ),
    1039            6 :             vec![0xf3; 24 * PAGE_SZ],
    1040            6 :             b"foobar".to_vec(),
    1041            6 :         ];
    1042           42 :         round_trip_test_compressed(blobs, false).await?;
    1043           33 :         round_trip_test_compressed(blobs, true).await?;
    1044            6 :         Ok(())
    1045            6 :     }
    1046              : 
    1047              :     #[tokio::test]
    1048            6 :     async fn test_arrays_inc() -> Result<(), Error> {
    1049            6 :         let blobs = (0..PAGE_SZ / 8)
    1050         6144 :             .map(|v| random_array(v * 16))
    1051            6 :             .collect::<Vec<_>>();
    1052         3516 :         round_trip_test_compressed(&blobs, false).await?;
    1053         3516 :         round_trip_test_compressed(&blobs, true).await?;
    1054            6 :         Ok(())
    1055            6 :     }
    1056              : 
    1057              :     #[test]
    1058            6 :     fn test_div_round_up() {
    1059            6 :         const CHUNK_SIZE: usize = 512;
    1060            6 :         assert_eq!(1, div_round_up(200, CHUNK_SIZE));
    1061            6 :         assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE));
    1062            6 :         assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE));
    1063            6 :     }
    1064              : }
        

Generated by: LCOV version 2.1-beta