LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 94.4 % 664 627
Test Date: 2025-04-24 20:31:15 Functions: 89.8 % 49 44

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::ops::Deref;
      20              : 
      21              : use bytes::Bytes;
      22              : use pageserver_api::key::Key;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::BoundedBuf;
      25              : use utils::lsn::Lsn;
      26              : use utils::vec_map::VecMap;
      27              : 
      28              : use crate::context::RequestContext;
      29              : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, Header};
      30              : use crate::virtual_file::{self, IoBufferMut, VirtualFile};
      31              : 
      32              : /// Metadata bundled with the start and end offset of a blob.
      33              : #[derive(Copy, Clone, Debug)]
      34              : pub struct BlobMeta {
      35              :     pub key: Key,
      36              :     pub lsn: Lsn,
      37              :     pub will_init: bool,
      38              : }
      39              : 
      40              : /// A view into the vectored blobs read buffer.
      41              : #[derive(Clone, Debug)]
      42              : pub(crate) enum BufView<'a> {
      43              :     Slice(&'a [u8]),
      44              :     Bytes(bytes::Bytes),
      45              : }
      46              : 
      47              : impl<'a> BufView<'a> {
      48              :     /// Creates a new slice-based view on the blob.
      49      1615391 :     pub fn new_slice(slice: &'a [u8]) -> Self {
      50      1615391 :         Self::Slice(slice)
      51      1615391 :     }
      52              : 
      53              :     /// Creates a new [`bytes::Bytes`]-based view on the blob.
      54           12 :     pub fn new_bytes(bytes: bytes::Bytes) -> Self {
      55           12 :         Self::Bytes(bytes)
      56           12 :     }
      57              : 
      58              :     /// Convert the view into `Bytes`.
      59              :     ///
      60              :     /// If using slice as the underlying storage, the copy will be an O(n) operation.
      61     10261454 :     pub fn into_bytes(self) -> Bytes {
      62     10261454 :         match self {
      63     10261454 :             BufView::Slice(slice) => Bytes::copy_from_slice(slice),
      64            0 :             BufView::Bytes(bytes) => bytes,
      65              :         }
      66     10261454 :     }
      67              : 
      68              :     /// Creates a sub-view of the blob based on the range.
      69     23249138 :     fn view(&self, range: std::ops::Range<usize>) -> Self {
      70     23249138 :         match self {
      71     23249138 :             BufView::Slice(slice) => BufView::Slice(&slice[range]),
      72            0 :             BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
      73              :         }
      74     23249138 :     }
      75              : }
      76              : 
      77              : impl Deref for BufView<'_> {
      78              :     type Target = [u8];
      79              : 
      80     13012752 :     fn deref(&self) -> &Self::Target {
      81     13012752 :         match self {
      82     13012740 :             BufView::Slice(slice) => slice,
      83           12 :             BufView::Bytes(bytes) => bytes,
      84              :         }
      85     13012752 :     }
      86              : }
      87              : 
      88              : impl AsRef<[u8]> for BufView<'_> {
      89            0 :     fn as_ref(&self) -> &[u8] {
      90            0 :         match self {
      91            0 :             BufView::Slice(slice) => slice,
      92            0 :             BufView::Bytes(bytes) => bytes.as_ref(),
      93              :         }
      94            0 :     }
      95              : }
      96              : 
      97              : impl<'a> From<&'a [u8]> for BufView<'a> {
      98            0 :     fn from(value: &'a [u8]) -> Self {
      99            0 :         Self::new_slice(value)
     100            0 :     }
     101              : }
     102              : 
     103              : impl From<Bytes> for BufView<'_> {
     104            0 :     fn from(value: Bytes) -> Self {
     105            0 :         Self::new_bytes(value)
     106            0 :     }
     107              : }
     108              : 
     109              : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
     110              : /// subject to [`VectoredBlob::compression_bits`].
     111              : pub struct VectoredBlob {
     112              :     /// Blob metadata.
     113              :     pub meta: BlobMeta,
     114              :     /// Header start offset.
     115              :     header_start: usize,
     116              :     /// Data start offset.
     117              :     data_start: usize,
     118              :     /// End offset.
     119              :     end: usize,
     120              :     /// Compression used on the data, extracted from the header.
     121              :     compression_bits: u8,
     122              : }
     123              : 
     124              : impl VectoredBlob {
     125              :     /// Reads a decompressed view of the blob.
     126     23126162 :     pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
     127     23126162 :         let view = buf.view(self.data_start..self.end);
     128     23126162 : 
     129     23126162 :         match self.compression_bits {
     130     23126150 :             BYTE_UNCOMPRESSED => Ok(view),
     131              :             BYTE_ZSTD => {
     132           12 :                 let mut decompressed_vec = Vec::new();
     133           12 :                 let mut decoder =
     134           12 :                     async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
     135           12 :                 decoder.write_all(&view).await?;
     136           12 :                 decoder.flush().await?;
     137              :                 // Zero-copy conversion from `Vec` to `Bytes`
     138           12 :                 Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
     139              :             }
     140            0 :             bits => {
     141            0 :                 let error = std::io::Error::new(
     142            0 :                     std::io::ErrorKind::InvalidData,
     143            0 :                     format!(
     144            0 :                         "Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}",
     145            0 :                         self.meta.key, self.meta.lsn, self.data_start, self.end
     146            0 :                     ),
     147            0 :                 );
     148            0 :                 Err(error)
     149              :             }
     150              :         }
     151     23126162 :     }
     152              : 
     153              :     /// Returns the raw blob including header.
     154       122976 :     pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> {
     155       122976 :         buf.view(self.header_start..self.end)
     156       122976 :     }
     157              : }
     158              : 
     159              : impl std::fmt::Display for VectoredBlob {
     160            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     161            0 :         write!(
     162            0 :             f,
     163            0 :             "{}@{}, {}..{}",
     164            0 :             self.meta.key, self.meta.lsn, self.data_start, self.end
     165            0 :         )
     166            0 :     }
     167              : }
     168              : 
     169              : /// Return type of [`VectoredBlobReader::read_blobs`]
     170              : pub struct VectoredBlobsBuf {
     171              :     /// Buffer for all blobs in this read
     172              :     pub buf: IoBufferMut,
     173              :     /// Offsets into the buffer and metadata for all blobs in this read
     174              :     pub blobs: Vec<VectoredBlob>,
     175              : }
     176              : 
     177              : /// Description of one disk read for multiple blobs.
     178              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
     179              : #[derive(Debug)]
     180              : pub struct VectoredRead {
     181              :     pub start: u64,
     182              :     pub end: u64,
     183              :     /// Start offset and metadata for each blob in this read
     184              :     pub blobs_at: VecMap<u64, BlobMeta>,
     185              : }
     186              : 
     187              : impl VectoredRead {
     188      7334427 :     pub(crate) fn size(&self) -> usize {
     189      7334427 :         (self.end - self.start) as usize
     190      7334427 :     }
     191              : }
     192              : 
     193              : #[derive(Eq, PartialEq, Debug)]
     194              : pub(crate) enum VectoredReadExtended {
     195              :     Yes,
     196              :     No,
     197              : }
     198              : 
     199              : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
     200              : pub(crate) struct ChunkedVectoredReadBuilder {
     201              :     /// Start block number
     202              :     start_blk_no: usize,
     203              :     /// End block number (exclusive).
     204              :     end_blk_no: usize,
     205              :     /// Start offset and metadata for each blob in this read
     206              :     blobs_at: VecMap<u64, BlobMeta>,
     207              :     max_read_size: Option<usize>,
     208              : }
     209              : 
     210              : impl ChunkedVectoredReadBuilder {
     211              :     const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
     212              :     /// Start building a new vectored read.
     213              :     ///
     214              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     215              :     /// support reading larger blobs than the configuration value. The builder will be single use
     216              :     /// however after that.
     217      1615847 :     fn new_impl(
     218      1615847 :         start_offset: u64,
     219      1615847 :         end_offset: u64,
     220      1615847 :         meta: BlobMeta,
     221      1615847 :         max_read_size: Option<usize>,
     222      1615847 :     ) -> Self {
     223      1615847 :         let mut blobs_at = VecMap::default();
     224      1615847 :         blobs_at
     225      1615847 :             .append(start_offset, meta)
     226      1615847 :             .expect("First insertion always succeeds");
     227      1615847 : 
     228      1615847 :         let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
     229      1615847 :         let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
     230      1615847 :         Self {
     231      1615847 :             start_blk_no,
     232      1615847 :             end_blk_no,
     233      1615847 :             blobs_at,
     234      1615847 :             max_read_size,
     235      1615847 :         }
     236      1615847 :     }
     237              : 
     238      1372475 :     pub(crate) fn new(
     239      1372475 :         start_offset: u64,
     240      1372475 :         end_offset: u64,
     241      1372475 :         meta: BlobMeta,
     242      1372475 :         max_read_size: usize,
     243      1372475 :     ) -> Self {
     244      1372475 :         Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
     245      1372475 :     }
     246              : 
     247       243372 :     pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
     248       243372 :         Self::new_impl(start_offset, end_offset, meta, None)
     249       243372 :     }
     250              : 
     251              :     /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
     252              :     ///
     253              :     /// The resulting size also must be below the max read size.
     254     21878131 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     255     21878131 :         tracing::trace!(start, end, "trying to extend");
     256     21878131 :         let start_blk_no = start as usize / Self::CHUNK_SIZE;
     257     21878131 :         let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
     258              : 
     259     21878131 :         let not_limited_by_max_read_size = {
     260     21878131 :             if let Some(max_read_size) = self.max_read_size {
     261      9353239 :                 let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
     262      9353239 :                 coalesced_size <= max_read_size
     263              :             } else {
     264     12524892 :                 true
     265              :             }
     266              :         };
     267              : 
     268              :         // True if the second block starts in the same block or the immediate next block where the first block ended.
     269              :         //
     270              :         // Note: This automatically handles the case where two blocks are adjacent to each other,
     271              :         // whether they starts on chunk size boundary or not.
     272     21878131 :         let is_adjacent_chunk_read = {
     273              :             // 1. first.end & second.start are in the same block
     274     21878131 :             self.end_blk_no == start_blk_no + 1 ||
     275              :             // 2. first.end ends one block before second.start
     276       262224 :             self.end_blk_no == start_blk_no
     277              :         };
     278              : 
     279     21878131 :         if is_adjacent_chunk_read && not_limited_by_max_read_size {
     280     21609291 :             self.end_blk_no = end_blk_no;
     281     21609291 :             self.blobs_at
     282     21609291 :                 .append(start, meta)
     283     21609291 :                 .expect("LSNs are ordered within vectored reads");
     284     21609291 : 
     285     21609291 :             return VectoredReadExtended::Yes;
     286       268840 :         }
     287       268840 : 
     288       268840 :         VectoredReadExtended::No
     289     21878131 :     }
     290              : 
     291     12764328 :     pub(crate) fn size(&self) -> usize {
     292     12764328 :         (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
     293     12764328 :     }
     294              : 
     295      1615847 :     pub(crate) fn build(self) -> VectoredRead {
     296      1615847 :         let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
     297      1615847 :         let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
     298      1615847 :         VectoredRead {
     299      1615847 :             start,
     300      1615847 :             end,
     301      1615847 :             blobs_at: self.blobs_at,
     302      1615847 :         }
     303      1615847 :     }
     304              : }
     305              : 
     306              : #[derive(Copy, Clone, Debug)]
     307              : pub enum BlobFlag {
     308              :     None,
     309              :     Ignore,
     310              :     ReplaceAll,
     311              : }
     312              : 
     313              : /// Planner for vectored blob reads.
     314              : ///
     315              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     316              : /// and coalesced into disk reads.
     317              : ///
     318              : /// The implementation is very simple:
     319              : /// * Collect all blob offsets in an ordered structure
     320              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     321              : pub struct VectoredReadPlanner {
     322              :     // Track all the blob offsets. Start offsets must be ordered.
     323              :     // Values in the value tuples are:
     324              :     // (
     325              :     //   lsn of the blob,
     326              :     //   start offset of the blob in the underlying file,
     327              :     //   end offset of the blob in the underlying file,
     328              :     //   whether the blob initializes the page image or not
     329              :     //   see [`pageserver_api::record::NeonWalRecord::will_init`]
     330              :     // )
     331              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
     332              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     333              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     334              : 
     335              :     max_read_size: usize,
     336              : }
     337              : 
     338              : impl VectoredReadPlanner {
     339      1600783 :     pub fn new(max_read_size: usize) -> Self {
     340      1600783 :         Self {
     341      1600783 :             blobs: BTreeMap::new(),
     342      1600783 :             prev: None,
     343      1600783 :             max_read_size,
     344      1600783 :         }
     345      1600783 :     }
     346              : 
     347              :     /// Include a new blob in the read plan.
     348              :     ///
     349              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     350              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     351              :     /// keys in a given keyspace. This function must be called for each key in the desired
     352              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     353              :     /// be called after every range in the offset.
     354              :     ///
     355              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     356              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     357              :     /// incorrect data to the user.
     358              :     ///
     359              :     /// The `flag` argument has two interesting values:
     360              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     361              :     ///   This is used for WAL records that `will_init`.
     362              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     363              :     ///   if the blob is cached.
     364     20999994 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     365              :         // Implementation note: internally lag behind by one blob such that
     366              :         // we have a start and end offset when initialising [`VectoredRead`]
     367     20999994 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     368              :             None => {
     369      1197019 :                 self.prev = Some((key, lsn, offset, flag));
     370      1197019 :                 return;
     371              :             }
     372     19802975 :             Some(prev) => prev,
     373     19802975 :         };
     374     19802975 : 
     375     19802975 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     376     19802975 : 
     377     19802975 :         self.prev = Some((key, lsn, offset, flag));
     378     20999994 :     }
     379              : 
     380      1719091 :     pub fn handle_range_end(&mut self, offset: u64) {
     381      1719091 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     382      1197019 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     383      1197019 :         }
     384              : 
     385      1719091 :         self.prev = None;
     386      1719091 :     }
     387              : 
     388     20999994 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     389     20999994 :         match flag {
     390     13551168 :             BlobFlag::None => {
     391     13551168 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     392     13551168 :                 blobs_for_key.push((lsn, start_offset, end_offset, false));
     393     13551168 :             }
     394      2053118 :             BlobFlag::ReplaceAll => {
     395      2053118 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     396      2053118 :                 blobs_for_key.clear();
     397      2053118 :                 blobs_for_key.push((lsn, start_offset, end_offset, true));
     398      2053118 :             }
     399      5395708 :             BlobFlag::Ignore => {}
     400              :         }
     401     20999994 :     }
     402              : 
     403      1600783 :     pub fn finish(self) -> Vec<VectoredRead> {
     404      1600783 :         let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
     405      1600783 :         let mut reads = Vec::new();
     406              : 
     407      3639801 :         for (key, blobs_for_key) in self.blobs {
     408     12471028 :             for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
     409     10432010 :                 let extended = match &mut current_read_builder {
     410      9353107 :                     Some(read_builder) => read_builder.extend(
     411      9353107 :                         start_offset,
     412      9353107 :                         end_offset,
     413      9353107 :                         BlobMeta {
     414      9353107 :                             key,
     415      9353107 :                             lsn,
     416      9353107 :                             will_init,
     417      9353107 :                         },
     418      9353107 :                     ),
     419      1078903 :                     None => VectoredReadExtended::No,
     420              :                 };
     421              : 
     422     10432010 :                 if extended == VectoredReadExtended::No {
     423      1347707 :                     let next_read_builder = ChunkedVectoredReadBuilder::new(
     424      1347707 :                         start_offset,
     425      1347707 :                         end_offset,
     426      1347707 :                         BlobMeta {
     427      1347707 :                             key,
     428      1347707 :                             lsn,
     429      1347707 :                             will_init,
     430      1347707 :                         },
     431      1347707 :                         self.max_read_size,
     432      1347707 :                     );
     433      1347707 : 
     434      1347707 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     435              : 
     436              :                     // `current_read_builder` is None in the first iteration of the outer loop
     437      1347707 :                     if let Some(read_builder) = prev_read_builder {
     438       268804 :                         reads.push(read_builder.build());
     439      1078903 :                     }
     440      9084303 :                 }
     441              :             }
     442              :         }
     443              : 
     444      1600783 :         if let Some(read_builder) = current_read_builder {
     445      1078903 :             reads.push(read_builder.build());
     446      1078903 :         }
     447              : 
     448      1600783 :         reads
     449      1600783 :     }
     450              : }
     451              : 
     452              : /// Disk reader for vectored blob spans (does not go through the page cache)
     453              : pub struct VectoredBlobReader<'a> {
     454              :     file: &'a VirtualFile,
     455              : }
     456              : 
     457              : impl<'a> VectoredBlobReader<'a> {
     458      1473623 :     pub fn new(file: &'a VirtualFile) -> Self {
     459      1473623 :         Self { file }
     460      1473623 :     }
     461              : 
     462              :     /// Read the requested blobs into the buffer.
     463              :     ///
     464              :     /// We have to deal with the fact that blobs are not fixed size.
     465              :     /// Each blob is prefixed by a size header.
     466              :     ///
     467              :     /// The success return value is a struct which contains the buffer
     468              :     /// filled from disk and a list of offsets at which each blob lies
     469              :     /// in the buffer.
     470      1615391 :     pub async fn read_blobs(
     471      1615391 :         &self,
     472      1615391 :         read: &VectoredRead,
     473      1615391 :         buf: IoBufferMut,
     474      1615391 :         ctx: &RequestContext,
     475      1615391 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     476      1615391 :         assert!(read.size() > 0);
     477      1615391 :         assert!(
     478      1615391 :             read.size() <= buf.capacity(),
     479            0 :             "{} > {}",
     480            0 :             read.size(),
     481            0 :             buf.capacity()
     482              :         );
     483              : 
     484      1615391 :         if cfg!(debug_assertions) {
     485      1615391 :             const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
     486      1615391 :             debug_assert_eq!(
     487      1615391 :                 read.start % ALIGN,
     488              :                 0,
     489            0 :                 "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
     490              :                 read.start,
     491              :                 ALIGN
     492              :             );
     493            0 :         }
     494              : 
     495      1615391 :         let buf = self
     496      1615391 :             .file
     497      1615391 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     498      1615391 :             .await?
     499      1615391 :             .into_inner();
     500      1615391 : 
     501      1615391 :         let blobs_at = read.blobs_at.as_slice();
     502      1615391 : 
     503      1615391 :         let mut blobs = Vec::with_capacity(blobs_at.len());
     504              :         // Blobs in `read` only provide their starting offset. The end offset
     505              :         // of a blob is implicit: the start of the next blob if one exists
     506              :         // or the end of the read.
     507              : 
     508     23224466 :         for (blob_start, meta) in blobs_at.iter().copied() {
     509     23224466 :             let header_start = (blob_start - read.start) as usize;
     510     23224466 :             let header = Header::decode(&buf[header_start..]).map_err(|anyhow_err| {
     511            0 :                 std::io::Error::new(std::io::ErrorKind::InvalidData, anyhow_err)
     512     23224466 :             })?;
     513     23224466 :             let data_start = header_start + header.header_len;
     514     23224466 :             let end = data_start + header.data_len;
     515     23224466 :             let compression_bits = header.compression_bits;
     516     23224466 : 
     517     23224466 :             blobs.push(VectoredBlob {
     518     23224466 :                 header_start,
     519     23224466 :                 data_start,
     520     23224466 :                 end,
     521     23224466 :                 meta,
     522     23224466 :                 compression_bits,
     523     23224466 :             });
     524              :         }
     525              : 
     526      1615391 :         Ok(VectoredBlobsBuf { buf, blobs })
     527      1615391 :     }
     528              : }
     529              : 
     530              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     531              : ///
     532              : /// It provides a streaming API for getting read blobs. It returns a batch when
     533              : /// `handle` gets called and when the current key would just exceed the read_size and
     534              : /// max_cnt constraints.
     535              : pub struct StreamingVectoredReadPlanner {
     536              :     read_builder: Option<ChunkedVectoredReadBuilder>,
     537              :     // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
     538              :     prev: Option<(Key, Lsn, u64, bool)>,
     539              :     /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
     540              :     /// we will produce a single batch instead of split them.
     541              :     max_read_size: u64,
     542              :     /// Max item count per batch
     543              :     max_cnt: usize,
     544              :     /// Size of the current batch
     545              :     cnt: usize,
     546              : }
     547              : 
     548              : impl StreamingVectoredReadPlanner {
     549         4956 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     550         4956 :         assert!(max_cnt > 0);
     551         4956 :         assert!(max_read_size > 0);
     552         4956 :         Self {
     553         4956 :             read_builder: None,
     554         4956 :             prev: None,
     555         4956 :             max_cnt,
     556         4956 :             max_read_size,
     557         4956 :             cnt: 0,
     558         4956 :         }
     559         4956 :     }
     560              : 
     561     12768600 :     pub fn handle(
     562     12768600 :         &mut self,
     563     12768600 :         key: Key,
     564     12768600 :         lsn: Lsn,
     565     12768600 :         offset: u64,
     566     12768600 :         will_init: bool,
     567     12768600 :     ) -> Option<VectoredRead> {
     568              :         // Implementation note: internally lag behind by one blob such that
     569              :         // we have a start and end offset when initialising [`VectoredRead`]
     570     12768600 :         let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
     571              :             None => {
     572         4272 :                 self.prev = Some((key, lsn, offset, will_init));
     573         4272 :                 return None;
     574              :             }
     575     12764328 :             Some(prev) => prev,
     576     12764328 :         };
     577     12764328 : 
     578     12764328 :         let res = self.add_blob(
     579     12764328 :             prev_key,
     580     12764328 :             prev_lsn,
     581     12764328 :             prev_offset,
     582     12764328 :             offset,
     583     12764328 :             false,
     584     12764328 :             prev_will_init,
     585     12764328 :         );
     586     12764328 : 
     587     12764328 :         self.prev = Some((key, lsn, offset, will_init));
     588     12764328 : 
     589     12764328 :         res
     590     12768600 :     }
     591              : 
     592         3948 :     pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
     593         3948 :         let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
     594         3936 :             self.add_blob(
     595         3936 :                 prev_key,
     596         3936 :                 prev_lsn,
     597         3936 :                 prev_offset,
     598         3936 :                 offset,
     599         3936 :                 true,
     600         3936 :                 prev_will_init,
     601         3936 :             )
     602              :         } else {
     603           12 :             None
     604              :         };
     605              : 
     606         3948 :         self.prev = None;
     607         3948 : 
     608         3948 :         res
     609         3948 :     }
     610              : 
     611     12768264 :     fn add_blob(
     612     12768264 :         &mut self,
     613     12768264 :         key: Key,
     614     12768264 :         lsn: Lsn,
     615     12768264 :         start_offset: u64,
     616     12768264 :         end_offset: u64,
     617     12768264 :         is_last_blob_in_read: bool,
     618     12768264 :         will_init: bool,
     619     12768264 :     ) -> Option<VectoredRead> {
     620     12768264 :         match &mut self.read_builder {
     621     12524892 :             Some(read_builder) => {
     622     12524892 :                 let extended = read_builder.extend(
     623     12524892 :                     start_offset,
     624     12524892 :                     end_offset,
     625     12524892 :                     BlobMeta {
     626     12524892 :                         key,
     627     12524892 :                         lsn,
     628     12524892 :                         will_init,
     629     12524892 :                     },
     630     12524892 :                 );
     631     12524892 :                 assert_eq!(extended, VectoredReadExtended::Yes);
     632              :             }
     633       243372 :             None => {
     634       243372 :                 self.read_builder = {
     635       243372 :                     Some(ChunkedVectoredReadBuilder::new_streaming(
     636       243372 :                         start_offset,
     637       243372 :                         end_offset,
     638       243372 :                         BlobMeta {
     639       243372 :                             key,
     640       243372 :                             lsn,
     641       243372 :                             will_init,
     642       243372 :                         },
     643       243372 :                     ))
     644       243372 :                 };
     645       243372 :             }
     646              :         }
     647     12768264 :         let read_builder = self.read_builder.as_mut().unwrap();
     648     12768264 :         self.cnt += 1;
     649     12768264 :         if is_last_blob_in_read
     650     12764328 :             || read_builder.size() >= self.max_read_size as usize
     651     12591264 :             || self.cnt >= self.max_cnt
     652              :         {
     653       243372 :             let prev_read_builder = self.read_builder.take();
     654       243372 :             self.cnt = 0;
     655              : 
     656              :             // `current_read_builder` is None in the first iteration
     657       243372 :             if let Some(read_builder) = prev_read_builder {
     658       243372 :                 return Some(read_builder.build());
     659            0 :             }
     660     12524892 :         }
     661     12524892 :         None
     662     12768264 :     }
     663              : }
     664              : 
     665              : #[cfg(test)]
     666              : mod tests {
     667              : 
     668              :     use super::super::blob_io::tests::{random_array, write_maybe_compressed};
     669              :     use super::*;
     670              :     use crate::context::DownloadBehavior;
     671              :     use crate::page_cache::PAGE_SZ;
     672              :     use crate::task_mgr::TaskKind;
     673              : 
     674          288 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     675              :         const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
     676          288 :         assert_eq!(read.start % ALIGN, 0);
     677          288 :         assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
     678              : 
     679          504 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     680          288 : 
     681          288 :         let offsets_in_read: Vec<_> = read
     682          288 :             .blobs_at
     683          288 :             .as_slice()
     684          288 :             .iter()
     685          504 :             .map(|(offset, _)| *offset)
     686          288 :             .collect();
     687          288 : 
     688          288 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     689          288 :     }
     690              : 
     691              :     #[test]
     692           12 :     fn planner_chunked_coalesce_all_test() {
     693              :         use crate::virtual_file;
     694              : 
     695              :         const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
     696              : 
     697           12 :         let max_read_size = CHUNK_SIZE as usize * 8;
     698           12 :         let key = Key::MIN;
     699           12 :         let lsn = Lsn(0);
     700           12 : 
     701           12 :         let blob_descriptions = [
     702           12 :             (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
     703           12 :             (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
     704           12 :             (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
     705           12 :             (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
     706           12 :             (key, lsn, CHUNK_SIZE, BlobFlag::None),
     707           12 :             (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
     708           12 :             (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
     709           12 :             (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
     710           12 :             (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
     711           12 :             (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
     712           12 :             (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
     713           12 :             (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
     714           12 :             (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
     715           12 :             (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
     716           12 :         ];
     717           12 : 
     718           12 :         let ranges = [
     719           12 :             &[
     720           12 :                 blob_descriptions[0],
     721           12 :                 blob_descriptions[2],
     722           12 :                 blob_descriptions[4],
     723           12 :                 blob_descriptions[5],
     724           12 :                 blob_descriptions[7],
     725           12 :                 blob_descriptions[8],
     726           12 :                 blob_descriptions[10],
     727           12 :             ],
     728           12 :             &blob_descriptions[11..12],
     729           12 :             &blob_descriptions[13..],
     730           12 :         ];
     731           12 : 
     732           12 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     733          180 :         for (key, lsn, offset, flag) in blob_descriptions {
     734          168 :             planner.handle(key, lsn, offset, flag);
     735          168 :         }
     736              : 
     737           12 :         planner.handle_range_end(652 * 1024);
     738           12 : 
     739           12 :         let reads = planner.finish();
     740           12 : 
     741           12 :         assert_eq!(reads.len(), ranges.len());
     742              : 
     743           36 :         for (idx, read) in reads.iter().enumerate() {
     744           36 :             validate_read(read, ranges[idx]);
     745           36 :         }
     746           12 :     }
     747              : 
     748              :     #[test]
     749           12 :     fn planner_max_read_size_test() {
     750           12 :         let max_read_size = 128 * 1024;
     751           12 :         let key = Key::MIN;
     752           12 :         let lsn = Lsn(0);
     753           12 : 
     754           12 :         let blob_descriptions = vec![
     755           12 :             (key, lsn, 0, BlobFlag::None),
     756           12 :             (key, lsn, 32 * 1024, BlobFlag::None),
     757           12 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     758           12 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     759           12 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     760           12 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     761           12 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     762           12 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     763           12 :         ];
     764           12 : 
     765           12 :         let ranges = [
     766           12 :             &blob_descriptions[0..3],
     767           12 :             &blob_descriptions[3..4],
     768           12 :             &blob_descriptions[4..5],
     769           12 :             &blob_descriptions[5..6],
     770           12 :             &blob_descriptions[6..7],
     771           12 :             &blob_descriptions[7..],
     772           12 :         ];
     773           12 : 
     774           12 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     775           96 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     776           96 :             planner.handle(key, lsn, offset, flag);
     777           96 :         }
     778              : 
     779           12 :         planner.handle_range_end(652 * 1024);
     780           12 : 
     781           12 :         let reads = planner.finish();
     782           12 : 
     783           12 :         assert_eq!(reads.len(), 6);
     784              : 
     785              :         // TODO: could remove zero reads to produce 5 reads here
     786              : 
     787           72 :         for (idx, read) in reads.iter().enumerate() {
     788           72 :             validate_read(read, ranges[idx]);
     789           72 :         }
     790           12 :     }
     791              : 
     792              :     #[test]
     793           12 :     fn planner_replacement_test() {
     794              :         const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
     795           12 :         let max_read_size = 128 * CHUNK_SIZE as usize;
     796           12 :         let first_key = Key::MIN;
     797           12 :         let second_key = first_key.next();
     798           12 :         let lsn = Lsn(0);
     799           12 : 
     800           12 :         let blob_descriptions = vec![
     801           12 :             (first_key, lsn, 0, BlobFlag::None),          // First in read 1
     802           12 :             (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
     803           12 :             (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
     804           12 :             (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
     805           12 :             (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
     806           12 :             (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None),       // Last in read 2
     807           12 :         ];
     808           12 : 
     809           12 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     810           12 : 
     811           12 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     812           72 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     813           72 :             planner.handle(key, lsn, offset, flag);
     814           72 :         }
     815              : 
     816           12 :         planner.handle_range_end(6 * CHUNK_SIZE);
     817           12 : 
     818           12 :         let reads = planner.finish();
     819           12 :         assert_eq!(reads.len(), 2);
     820              : 
     821           24 :         for (idx, read) in reads.iter().enumerate() {
     822           24 :             validate_read(read, ranges[idx]);
     823           24 :         }
     824           12 :     }
     825              : 
     826              :     #[test]
     827           12 :     fn streaming_planner_max_read_size_test() {
     828           12 :         let max_read_size = 128 * 1024;
     829           12 :         let key = Key::MIN;
     830           12 :         let lsn = Lsn(0);
     831           12 : 
     832           12 :         let blob_descriptions = vec![
     833           12 :             (key, lsn, 0, BlobFlag::None),
     834           12 :             (key, lsn, 32 * 1024, BlobFlag::None),
     835           12 :             (key, lsn, 96 * 1024, BlobFlag::None),
     836           12 :             (key, lsn, 128 * 1024, BlobFlag::None),
     837           12 :             (key, lsn, 198 * 1024, BlobFlag::None),
     838           12 :             (key, lsn, 268 * 1024, BlobFlag::None),
     839           12 :             (key, lsn, 396 * 1024, BlobFlag::None),
     840           12 :             (key, lsn, 652 * 1024, BlobFlag::None),
     841           12 :         ];
     842           12 : 
     843           12 :         let ranges = [
     844           12 :             &blob_descriptions[0..3],
     845           12 :             &blob_descriptions[3..5],
     846           12 :             &blob_descriptions[5..6],
     847           12 :             &blob_descriptions[6..7],
     848           12 :             &blob_descriptions[7..],
     849           12 :         ];
     850           12 : 
     851           12 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
     852           12 :         let mut reads = Vec::new();
     853           96 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     854           96 :             reads.extend(planner.handle(key, lsn, offset, false));
     855           96 :         }
     856           12 :         reads.extend(planner.handle_range_end(652 * 1024));
     857           12 : 
     858           12 :         assert_eq!(reads.len(), ranges.len());
     859              : 
     860           60 :         for (idx, read) in reads.iter().enumerate() {
     861           60 :             validate_read(read, ranges[idx]);
     862           60 :         }
     863           12 :     }
     864              : 
     865              :     #[test]
     866           12 :     fn streaming_planner_max_cnt_test() {
     867           12 :         let max_read_size = 1024 * 1024;
     868           12 :         let key = Key::MIN;
     869           12 :         let lsn = Lsn(0);
     870           12 : 
     871           12 :         let blob_descriptions = vec![
     872           12 :             (key, lsn, 0, BlobFlag::None),
     873           12 :             (key, lsn, 32 * 1024, BlobFlag::None),
     874           12 :             (key, lsn, 96 * 1024, BlobFlag::None),
     875           12 :             (key, lsn, 128 * 1024, BlobFlag::None),
     876           12 :             (key, lsn, 198 * 1024, BlobFlag::None),
     877           12 :             (key, lsn, 268 * 1024, BlobFlag::None),
     878           12 :             (key, lsn, 396 * 1024, BlobFlag::None),
     879           12 :             (key, lsn, 652 * 1024, BlobFlag::None),
     880           12 :         ];
     881           12 : 
     882           12 :         let ranges = [
     883           12 :             &blob_descriptions[0..2],
     884           12 :             &blob_descriptions[2..4],
     885           12 :             &blob_descriptions[4..6],
     886           12 :             &blob_descriptions[6..],
     887           12 :         ];
     888           12 : 
     889           12 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     890           12 :         let mut reads = Vec::new();
     891           96 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     892           96 :             reads.extend(planner.handle(key, lsn, offset, false));
     893           96 :         }
     894           12 :         reads.extend(planner.handle_range_end(652 * 1024));
     895           12 : 
     896           12 :         assert_eq!(reads.len(), ranges.len());
     897              : 
     898           48 :         for (idx, read) in reads.iter().enumerate() {
     899           48 :             validate_read(read, ranges[idx]);
     900           48 :         }
     901           12 :     }
     902              : 
     903              :     #[test]
     904           12 :     fn streaming_planner_edge_test() {
     905           12 :         let max_read_size = 1024 * 1024;
     906           12 :         let key = Key::MIN;
     907           12 :         let lsn = Lsn(0);
     908           12 :         {
     909           12 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     910           12 :             let mut reads = Vec::new();
     911           12 :             reads.extend(planner.handle_range_end(652 * 1024));
     912           12 :             assert!(reads.is_empty());
     913              :         }
     914              :         {
     915           12 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     916           12 :             let mut reads = Vec::new();
     917           12 :             reads.extend(planner.handle(key, lsn, 0, false));
     918           12 :             reads.extend(planner.handle_range_end(652 * 1024));
     919           12 :             assert_eq!(reads.len(), 1);
     920           12 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     921           12 :         }
     922           12 :         {
     923           12 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     924           12 :             let mut reads = Vec::new();
     925           12 :             reads.extend(planner.handle(key, lsn, 0, false));
     926           12 :             reads.extend(planner.handle(key, lsn, 128 * 1024, false));
     927           12 :             reads.extend(planner.handle_range_end(652 * 1024));
     928           12 :             assert_eq!(reads.len(), 2);
     929           12 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     930           12 :             validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
     931           12 :         }
     932           12 :         {
     933           12 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     934           12 :             let mut reads = Vec::new();
     935           12 :             reads.extend(planner.handle(key, lsn, 0, false));
     936           12 :             reads.extend(planner.handle(key, lsn, 128 * 1024, false));
     937           12 :             reads.extend(planner.handle_range_end(652 * 1024));
     938           12 :             assert_eq!(reads.len(), 1);
     939           12 :             validate_read(
     940           12 :                 &reads[0],
     941           12 :                 &[
     942           12 :                     (key, lsn, 0, BlobFlag::None),
     943           12 :                     (key, lsn, 128 * 1024, BlobFlag::None),
     944           12 :                 ],
     945           12 :             );
     946           12 :         }
     947           12 :     }
     948              : 
     949           48 :     async fn round_trip_test_compressed(
     950           48 :         blobs: &[Vec<u8>],
     951           48 :         compression: bool,
     952           48 :     ) -> anyhow::Result<()> {
     953           48 :         let ctx =
     954           48 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     955           48 :         let (_temp_dir, pathbuf, offsets) =
     956           48 :             write_maybe_compressed(blobs, compression, &ctx).await?;
     957              : 
     958           48 :         let file = VirtualFile::open_v2(&pathbuf, &ctx).await?;
     959           48 :         let file_len = std::fs::metadata(&pathbuf)?.len();
     960           48 : 
     961           48 :         // Multiply by two (compressed data might need more space), and add a few bytes for the header
     962        24720 :         let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
     963           48 :         let mut buf = IoBufferMut::with_capacity(reserved_bytes);
     964           48 : 
     965           48 :         let vectored_blob_reader = VectoredBlobReader::new(&file);
     966           48 :         let meta = BlobMeta {
     967           48 :             key: Key::MIN,
     968           48 :             lsn: Lsn(0),
     969           48 :             will_init: false,
     970           48 :         };
     971              : 
     972        24720 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     973        24720 :             let end = offsets.get(idx + 1).unwrap_or(&file_len);
     974        24720 :             if idx + 1 == offsets.len() {
     975           48 :                 continue;
     976        24672 :             }
     977        24672 :             let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
     978        24672 :             let read = read_builder.build();
     979        24672 :             let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
     980        24672 :             assert_eq!(result.blobs.len(), 1);
     981        24672 :             let read_blob = &result.blobs[0];
     982        24672 :             let view = BufView::new_slice(&result.buf);
     983        24672 :             let read_buf = read_blob.read(&view).await?;
     984        24672 :             assert_eq!(
     985        24672 :                 &blob[..],
     986        24672 :                 &read_buf[..],
     987            0 :                 "mismatch for idx={idx} at offset={offset}"
     988              :             );
     989              : 
     990              :             // Check that raw_with_header returns a valid header.
     991        24672 :             let raw = read_blob.raw_with_header(&view);
     992        24672 :             let header = Header::decode(&raw)?;
     993        24672 :             if !compression || header.header_len == 1 {
     994        12456 :                 assert_eq!(header.compression_bits, BYTE_UNCOMPRESSED);
     995        12216 :             }
     996        24672 :             assert_eq!(raw.len(), header.total_len());
     997              : 
     998        24672 :             buf = result.buf;
     999              :         }
    1000           48 :         Ok(())
    1001           48 :     }
    1002              : 
    1003              :     #[tokio::test]
    1004           12 :     async fn test_really_big_array() -> anyhow::Result<()> {
    1005           12 :         let blobs = &[
    1006           12 :             b"test".to_vec(),
    1007           12 :             random_array(10 * PAGE_SZ),
    1008           12 :             b"hello".to_vec(),
    1009           12 :             random_array(66 * PAGE_SZ),
    1010           12 :             vec![0xf3; 24 * PAGE_SZ],
    1011           12 :             b"foobar".to_vec(),
    1012           12 :         ];
    1013           12 :         round_trip_test_compressed(blobs, false).await?;
    1014           12 :         round_trip_test_compressed(blobs, true).await?;
    1015           12 :         Ok(())
    1016           12 :     }
    1017              : 
    1018              :     #[tokio::test]
    1019           12 :     async fn test_arrays_inc() -> anyhow::Result<()> {
    1020           12 :         let blobs = (0..PAGE_SZ / 8)
    1021        12288 :             .map(|v| random_array(v * 16))
    1022           12 :             .collect::<Vec<_>>();
    1023           12 :         round_trip_test_compressed(&blobs, false).await?;
    1024           12 :         round_trip_test_compressed(&blobs, true).await?;
    1025           12 :         Ok(())
    1026           12 :     }
    1027              : }
        

Generated by: LCOV version 2.1-beta