LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 95.0 % 660 627
Test Date: 2025-02-20 13:11:02 Functions: 91.5 % 47 43

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::ops::Deref;
      20              : 
      21              : use bytes::Bytes;
      22              : use pageserver_api::key::Key;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::BoundedBuf;
      25              : use utils::lsn::Lsn;
      26              : use utils::vec_map::VecMap;
      27              : 
      28              : use crate::context::RequestContext;
      29              : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
      30              : use crate::virtual_file::IoBufferMut;
      31              : use crate::virtual_file::{self, VirtualFile};
      32              : 
      33              : /// Metadata bundled with the start and end offset of a blob.
      34              : #[derive(Copy, Clone, Debug)]
      35              : pub struct BlobMeta {
      36              :     pub key: Key,
      37              :     pub lsn: Lsn,
      38              :     pub will_init: bool,
      39              : }
      40              : 
      41              : /// A view into the vectored blobs read buffer.
      42              : #[derive(Clone, Debug)]
      43              : pub(crate) enum BufView<'a> {
      44              :     Slice(&'a [u8]),
      45              :     Bytes(bytes::Bytes),
      46              : }
      47              : 
      48              : impl<'a> BufView<'a> {
      49              :     /// Creates a new slice-based view on the blob.
      50       441261 :     pub fn new_slice(slice: &'a [u8]) -> Self {
      51       441261 :         Self::Slice(slice)
      52       441261 :     }
      53              : 
      54              :     /// Creates a new [`bytes::Bytes`]-based view on the blob.
      55            4 :     pub fn new_bytes(bytes: bytes::Bytes) -> Self {
      56            4 :         Self::Bytes(bytes)
      57            4 :     }
      58              : 
      59              :     /// Convert the view into `Bytes`.
      60              :     ///
      61              :     /// If using slice as the underlying storage, the copy will be an O(n) operation.
      62      1066050 :     pub fn into_bytes(self) -> Bytes {
      63      1066050 :         match self {
      64      1066050 :             BufView::Slice(slice) => Bytes::copy_from_slice(slice),
      65            0 :             BufView::Bytes(bytes) => bytes,
      66              :         }
      67      1066050 :     }
      68              : 
      69              :     /// Creates a sub-view of the blob based on the range.
      70      5387042 :     fn view(&self, range: std::ops::Range<usize>) -> Self {
      71      5387042 :         match self {
      72      5387042 :             BufView::Slice(slice) => BufView::Slice(&slice[range]),
      73            0 :             BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
      74              :         }
      75      5387042 :     }
      76              : }
      77              : 
      78              : impl Deref for BufView<'_> {
      79              :     type Target = [u8];
      80              : 
      81      4321124 :     fn deref(&self) -> &Self::Target {
      82      4321124 :         match self {
      83      4321120 :             BufView::Slice(slice) => slice,
      84            4 :             BufView::Bytes(bytes) => bytes,
      85              :         }
      86      4321124 :     }
      87              : }
      88              : 
      89              : impl AsRef<[u8]> for BufView<'_> {
      90            0 :     fn as_ref(&self) -> &[u8] {
      91            0 :         match self {
      92            0 :             BufView::Slice(slice) => slice,
      93            0 :             BufView::Bytes(bytes) => bytes.as_ref(),
      94              :         }
      95            0 :     }
      96              : }
      97              : 
      98              : impl<'a> From<&'a [u8]> for BufView<'a> {
      99            0 :     fn from(value: &'a [u8]) -> Self {
     100            0 :         Self::new_slice(value)
     101            0 :     }
     102              : }
     103              : 
     104              : impl From<Bytes> for BufView<'_> {
     105            0 :     fn from(value: Bytes) -> Self {
     106            0 :         Self::new_bytes(value)
     107            0 :     }
     108              : }
     109              : 
     110              : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
     111              : /// subject to [`VectoredBlob::compression_bits`].
     112              : pub struct VectoredBlob {
     113              :     /// Blob metadata.
     114              :     pub meta: BlobMeta,
     115              :     /// Start offset.
     116              :     start: usize,
     117              :     /// End offset.
     118              :     end: usize,
     119              :     /// Compression used on the the blob.
     120              :     compression_bits: u8,
     121              : }
     122              : 
     123              : impl VectoredBlob {
     124              :     /// Reads a decompressed view of the blob.
     125      5387042 :     pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
     126      5387042 :         let view = buf.view(self.start..self.end);
     127      5387042 : 
     128      5387042 :         match self.compression_bits {
     129      5387038 :             BYTE_UNCOMPRESSED => Ok(view),
     130              :             BYTE_ZSTD => {
     131            4 :                 let mut decompressed_vec = Vec::new();
     132            4 :                 let mut decoder =
     133            4 :                     async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
     134            4 :                 decoder.write_all(&view).await?;
     135            4 :                 decoder.flush().await?;
     136              :                 // Zero-copy conversion from `Vec` to `Bytes`
     137            4 :                 Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
     138              :             }
     139            0 :             bits => {
     140            0 :                 let error = std::io::Error::new(
     141            0 :                     std::io::ErrorKind::InvalidData,
     142            0 :                     format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end),
     143            0 :                 );
     144            0 :                 Err(error)
     145              :             }
     146              :         }
     147      5387042 :     }
     148              : }
     149              : 
     150              : impl std::fmt::Display for VectoredBlob {
     151            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     152            0 :         write!(
     153            0 :             f,
     154            0 :             "{}@{}, {}..{}",
     155            0 :             self.meta.key, self.meta.lsn, self.start, self.end
     156            0 :         )
     157            0 :     }
     158              : }
     159              : 
     160              : /// Return type of [`VectoredBlobReader::read_blobs`]
     161              : pub struct VectoredBlobsBuf {
     162              :     /// Buffer for all blobs in this read
     163              :     pub buf: IoBufferMut,
     164              :     /// Offsets into the buffer and metadata for all blobs in this read
     165              :     pub blobs: Vec<VectoredBlob>,
     166              : }
     167              : 
     168              : /// Description of one disk read for multiple blobs.
     169              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
     170              : #[derive(Debug)]
     171              : pub struct VectoredRead {
     172              :     pub start: u64,
     173              :     pub end: u64,
     174              :     /// Start offset and metadata for each blob in this read
     175              :     pub blobs_at: VecMap<u64, BlobMeta>,
     176              : }
     177              : 
     178              : impl VectoredRead {
     179      2018615 :     pub(crate) fn size(&self) -> usize {
     180      2018615 :         (self.end - self.start) as usize
     181      2018615 :     }
     182              : }
     183              : 
     184              : #[derive(Eq, PartialEq, Debug)]
     185              : pub(crate) enum VectoredReadExtended {
     186              :     Yes,
     187              :     No,
     188              : }
     189              : 
     190              : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
     191              : pub(crate) struct ChunkedVectoredReadBuilder {
     192              :     /// Start block number
     193              :     start_blk_no: usize,
     194              :     /// End block number (exclusive).
     195              :     end_blk_no: usize,
     196              :     /// Start offset and metadata for each blob in this read
     197              :     blobs_at: VecMap<u64, BlobMeta>,
     198              :     max_read_size: Option<usize>,
     199              : }
     200              : 
     201              : impl ChunkedVectoredReadBuilder {
     202              :     const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
     203              :     /// Start building a new vectored read.
     204              :     ///
     205              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     206              :     /// support reading larger blobs than the configuration value. The builder will be single use
     207              :     /// however after that.
     208       441413 :     fn new_impl(
     209       441413 :         start_offset: u64,
     210       441413 :         end_offset: u64,
     211       441413 :         meta: BlobMeta,
     212       441413 :         max_read_size: Option<usize>,
     213       441413 :     ) -> Self {
     214       441413 :         let mut blobs_at = VecMap::default();
     215       441413 :         blobs_at
     216       441413 :             .append(start_offset, meta)
     217       441413 :             .expect("First insertion always succeeds");
     218       441413 : 
     219       441413 :         let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
     220       441413 :         let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
     221       441413 :         Self {
     222       441413 :             start_blk_no,
     223       441413 :             end_blk_no,
     224       441413 :             blobs_at,
     225       441413 :             max_read_size,
     226       441413 :         }
     227       441413 :     }
     228              : 
     229       360301 :     pub(crate) fn new(
     230       360301 :         start_offset: u64,
     231       360301 :         end_offset: u64,
     232       360301 :         meta: BlobMeta,
     233       360301 :         max_read_size: usize,
     234       360301 :     ) -> Self {
     235       360301 :         Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
     236       360301 :     }
     237              : 
     238        81112 :     pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
     239        81112 :         Self::new_impl(start_offset, end_offset, meta, None)
     240        81112 :     }
     241              : 
     242              :     /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
     243              :     ///
     244              :     /// The resulting size also must be below the max read size.
     245      4991047 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     246      4991047 :         tracing::trace!(start, end, "trying to extend");
     247      4991047 :         let start_blk_no = start as usize / Self::CHUNK_SIZE;
     248      4991047 :         let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
     249              : 
     250      4991047 :         let not_limited_by_max_read_size = {
     251      4991047 :             if let Some(max_read_size) = self.max_read_size {
     252       816151 :                 let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
     253       816151 :                 coalesced_size <= max_read_size
     254              :             } else {
     255      4174896 :                 true
     256              :             }
     257              :         };
     258              : 
     259              :         // True if the second block starts in the same block or the immediate next block where the first block ended.
     260              :         //
     261              :         // Note: This automatically handles the case where two blocks are adjacent to each other,
     262              :         // whether they starts on chunk size boundary or not.
     263      4991047 :         let is_adjacent_chunk_read = {
     264              :             // 1. first.end & second.start are in the same block
     265      4991047 :             self.end_blk_no == start_blk_no + 1 ||
     266              :             // 2. first.end ends one block before second.start
     267        26641 :             self.end_blk_no == start_blk_no
     268              :         };
     269              : 
     270      4991047 :         if is_adjacent_chunk_read && not_limited_by_max_read_size {
     271      4945853 :             self.end_blk_no = end_blk_no;
     272      4945853 :             self.blobs_at
     273      4945853 :                 .append(start, meta)
     274      4945853 :                 .expect("LSNs are ordered within vectored reads");
     275      4945853 : 
     276      4945853 :             return VectoredReadExtended::Yes;
     277        45194 :         }
     278        45194 : 
     279        45194 :         VectoredReadExtended::No
     280      4991047 :     }
     281              : 
     282      4254708 :     pub(crate) fn size(&self) -> usize {
     283      4254708 :         (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
     284      4254708 :     }
     285              : 
     286       441413 :     pub(crate) fn build(self) -> VectoredRead {
     287       441413 :         let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
     288       441413 :         let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
     289       441413 :         VectoredRead {
     290       441413 :             start,
     291       441413 :             end,
     292       441413 :             blobs_at: self.blobs_at,
     293       441413 :         }
     294       441413 :     }
     295              : }
     296              : 
     297              : #[derive(Copy, Clone, Debug)]
     298              : pub enum BlobFlag {
     299              :     None,
     300              :     Ignore,
     301              :     ReplaceAll,
     302              : }
     303              : 
     304              : /// Planner for vectored blob reads.
     305              : ///
     306              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     307              : /// and coalesced into disk reads.
     308              : ///
     309              : /// The implementation is very simple:
     310              : /// * Collect all blob offsets in an ordered structure
     311              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     312              : pub struct VectoredReadPlanner {
     313              :     // Track all the blob offsets. Start offsets must be ordered.
     314              :     // Values in the value tuples are:
     315              :     // (
     316              :     //   lsn of the blob,
     317              :     //   start offset of the blob in the underlying file,
     318              :     //   end offset of the blob in the underlying file,
     319              :     //   whether the blob initializes the page image or not
     320              :     //   see [`pageserver_api::record::NeonWalRecord::will_init`]
     321              :     // )
     322              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
     323              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     324              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     325              : 
     326              :     max_read_size: usize,
     327              : }
     328              : 
     329              : impl VectoredReadPlanner {
     330       479872 :     pub fn new(max_read_size: usize) -> Self {
     331       479872 :         Self {
     332       479872 :             blobs: BTreeMap::new(),
     333       479872 :             prev: None,
     334       479872 :             max_read_size,
     335       479872 :         }
     336       479872 :     }
     337              : 
     338              :     /// Include a new blob in the read plan.
     339              :     ///
     340              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     341              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     342              :     /// keys in a given keyspace. This function must be called for each key in the desired
     343              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     344              :     /// be called after every range in the offset.
     345              :     ///
     346              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     347              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     348              :     /// incorrect data to the user.
     349              :     ///
     350              :     /// The `flag` argument has two interesting values:
     351              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     352              :     ///   This is used for WAL records that `will_init`.
     353              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     354              :     ///   if the blob is cached.
     355      3043423 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     356              :         // Implementation note: internally lag behind by one blob such that
     357              :         // we have a start and end offset when initialising [`VectoredRead`]
     358      3043423 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     359              :             None => {
     360       307279 :                 self.prev = Some((key, lsn, offset, flag));
     361       307279 :                 return;
     362              :             }
     363      2736144 :             Some(prev) => prev,
     364      2736144 :         };
     365      2736144 : 
     366      2736144 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     367      2736144 : 
     368      2736144 :         self.prev = Some((key, lsn, offset, flag));
     369      3043423 :     }
     370              : 
     371       480352 :     pub fn handle_range_end(&mut self, offset: u64) {
     372       480352 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     373       307279 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     374       307279 :         }
     375              : 
     376       480352 :         self.prev = None;
     377       480352 :     }
     378              : 
     379      3043423 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     380      3043423 :         match flag {
     381       749516 :             BlobFlag::None => {
     382       749516 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     383       749516 :                 blobs_for_key.push((lsn, start_offset, end_offset, false));
     384       749516 :             }
     385       641087 :             BlobFlag::ReplaceAll => {
     386       641087 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     387       641087 :                 blobs_for_key.clear();
     388       641087 :                 blobs_for_key.push((lsn, start_offset, end_offset, true));
     389       641087 :             }
     390      1652820 :             BlobFlag::Ignore => {}
     391              :         }
     392      3043423 :     }
     393              : 
     394       479872 :     pub fn finish(self) -> Vec<VectoredRead> {
     395       479872 :         let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
     396       479872 :         let mut reads = Vec::new();
     397              : 
     398      1500406 :         for (key, blobs_for_key) in self.blobs {
     399      2143504 :             for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
     400      1122970 :                 let extended = match &mut current_read_builder {
     401       816107 :                     Some(read_builder) => read_builder.extend(
     402       816107 :                         start_offset,
     403       816107 :                         end_offset,
     404       816107 :                         BlobMeta {
     405       816107 :                             key,
     406       816107 :                             lsn,
     407       816107 :                             will_init,
     408       816107 :                         },
     409       816107 :                     ),
     410       306863 :                     None => VectoredReadExtended::No,
     411              :                 };
     412              : 
     413      1122970 :                 if extended == VectoredReadExtended::No {
     414       352045 :                     let next_read_builder = ChunkedVectoredReadBuilder::new(
     415       352045 :                         start_offset,
     416       352045 :                         end_offset,
     417       352045 :                         BlobMeta {
     418       352045 :                             key,
     419       352045 :                             lsn,
     420       352045 :                             will_init,
     421       352045 :                         },
     422       352045 :                         self.max_read_size,
     423       352045 :                     );
     424       352045 : 
     425       352045 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     426              : 
     427              :                     // `current_read_builder` is None in the first iteration of the outer loop
     428       352045 :                     if let Some(read_builder) = prev_read_builder {
     429        45182 :                         reads.push(read_builder.build());
     430       306863 :                     }
     431       770925 :                 }
     432              :             }
     433              :         }
     434              : 
     435       479872 :         if let Some(read_builder) = current_read_builder {
     436       306863 :             reads.push(read_builder.build());
     437       306863 :         }
     438              : 
     439       479872 :         reads
     440       479872 :     }
     441              : }
     442              : 
     443              : /// Disk reader for vectored blob spans (does not go through the page cache)
     444              : pub struct VectoredBlobReader<'a> {
     445              :     file: &'a VirtualFile,
     446              : }
     447              : 
     448              : impl<'a> VectoredBlobReader<'a> {
     449       393989 :     pub fn new(file: &'a VirtualFile) -> Self {
     450       393989 :         Self { file }
     451       393989 :     }
     452              : 
     453              :     /// Read the requested blobs into the buffer.
     454              :     ///
     455              :     /// We have to deal with the fact that blobs are not fixed size.
     456              :     /// Each blob is prefixed by a size header.
     457              :     ///
     458              :     /// The success return value is a struct which contains the buffer
     459              :     /// filled from disk and a list of offsets at which each blob lies
     460              :     /// in the buffer.
     461       441261 :     pub async fn read_blobs(
     462       441261 :         &self,
     463       441261 :         read: &VectoredRead,
     464       441261 :         buf: IoBufferMut,
     465       441261 :         ctx: &RequestContext,
     466       441261 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     467       441261 :         assert!(read.size() > 0);
     468       441261 :         assert!(
     469       441261 :             read.size() <= buf.capacity(),
     470            0 :             "{} > {}",
     471            0 :             read.size(),
     472            0 :             buf.capacity()
     473              :         );
     474              : 
     475       441261 :         if cfg!(debug_assertions) {
     476       441261 :             const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
     477       441261 :             debug_assert_eq!(
     478       441261 :                 read.start % ALIGN,
     479              :                 0,
     480            0 :                 "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
     481              :                 read.start,
     482              :                 ALIGN
     483              :             );
     484            0 :         }
     485              : 
     486       441261 :         let buf = self
     487       441261 :             .file
     488       441261 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     489       441261 :             .await?
     490       441261 :             .into_inner();
     491       441261 : 
     492       441261 :         let blobs_at = read.blobs_at.as_slice();
     493       441261 : 
     494       441261 :         let start_offset = read.start;
     495       441261 : 
     496       441261 :         let mut metas = Vec::with_capacity(blobs_at.len());
     497              :         // Blobs in `read` only provide their starting offset. The end offset
     498              :         // of a blob is implicit: the start of the next blob if one exists
     499              :         // or the end of the read.
     500              : 
     501      5828303 :         for (blob_start, meta) in blobs_at {
     502      5387042 :             let blob_start_in_buf = blob_start - start_offset;
     503      5387042 :             let first_len_byte = buf[blob_start_in_buf as usize];
     504              : 
     505              :             // Each blob is prefixed by a header containing its size and compression information.
     506              :             // Extract the size and skip that header to find the start of the data.
     507              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     508              :             // 1 byte case and 1 in the 4 byte case.
     509      5387042 :             let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
     510      5311658 :                 (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
     511              :             } else {
     512        75384 :                 let mut blob_size_buf = [0u8; 4];
     513        75384 :                 let offset_in_buf = blob_start_in_buf as usize;
     514        75384 : 
     515        75384 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     516        75384 :                 blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
     517        75384 : 
     518        75384 :                 let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     519        75384 :                 (
     520        75384 :                     4,
     521        75384 :                     u32::from_be_bytes(blob_size_buf) as u64,
     522        75384 :                     compression_bits,
     523        75384 :                 )
     524              :             };
     525              : 
     526      5387042 :             let start = (blob_start_in_buf + size_length) as usize;
     527      5387042 :             let end = start + blob_size as usize;
     528      5387042 : 
     529      5387042 :             metas.push(VectoredBlob {
     530      5387042 :                 start,
     531      5387042 :                 end,
     532      5387042 :                 meta: *meta,
     533      5387042 :                 compression_bits,
     534      5387042 :             });
     535              :         }
     536              : 
     537       441261 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     538       441261 :     }
     539              : }
     540              : 
     541              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     542              : ///
     543              : /// It provides a streaming API for getting read blobs. It returns a batch when
     544              : /// `handle` gets called and when the current key would just exceed the read_size and
     545              : /// max_cnt constraints.
     546              : pub struct StreamingVectoredReadPlanner {
     547              :     read_builder: Option<ChunkedVectoredReadBuilder>,
     548              :     // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
     549              :     prev: Option<(Key, Lsn, u64, bool)>,
     550              :     /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
     551              :     /// we will produce a single batch instead of split them.
     552              :     max_read_size: u64,
     553              :     /// Max item count per batch
     554              :     max_cnt: usize,
     555              :     /// Size of the current batch
     556              :     cnt: usize,
     557              : }
     558              : 
     559              : impl StreamingVectoredReadPlanner {
     560         1640 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     561         1640 :         assert!(max_cnt > 0);
     562         1640 :         assert!(max_read_size > 0);
     563         1640 :         Self {
     564         1640 :             read_builder: None,
     565         1640 :             prev: None,
     566         1640 :             max_cnt,
     567         1640 :             max_read_size,
     568         1640 :             cnt: 0,
     569         1640 :         }
     570         1640 :     }
     571              : 
     572      4256120 :     pub fn handle(
     573      4256120 :         &mut self,
     574      4256120 :         key: Key,
     575      4256120 :         lsn: Lsn,
     576      4256120 :         offset: u64,
     577      4256120 :         will_init: bool,
     578      4256120 :     ) -> Option<VectoredRead> {
     579              :         // Implementation note: internally lag behind by one blob such that
     580              :         // we have a start and end offset when initialising [`VectoredRead`]
     581      4256120 :         let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
     582              :             None => {
     583         1412 :                 self.prev = Some((key, lsn, offset, will_init));
     584         1412 :                 return None;
     585              :             }
     586      4254708 :             Some(prev) => prev,
     587      4254708 :         };
     588      4254708 : 
     589      4254708 :         let res = self.add_blob(
     590      4254708 :             prev_key,
     591      4254708 :             prev_lsn,
     592      4254708 :             prev_offset,
     593      4254708 :             offset,
     594      4254708 :             false,
     595      4254708 :             prev_will_init,
     596      4254708 :         );
     597      4254708 : 
     598      4254708 :         self.prev = Some((key, lsn, offset, will_init));
     599      4254708 : 
     600      4254708 :         res
     601      4256120 :     }
     602              : 
     603         1304 :     pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
     604         1304 :         let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
     605         1300 :             self.add_blob(
     606         1300 :                 prev_key,
     607         1300 :                 prev_lsn,
     608         1300 :                 prev_offset,
     609         1300 :                 offset,
     610         1300 :                 true,
     611         1300 :                 prev_will_init,
     612         1300 :             )
     613              :         } else {
     614            4 :             None
     615              :         };
     616              : 
     617         1304 :         self.prev = None;
     618         1304 : 
     619         1304 :         res
     620         1304 :     }
     621              : 
     622      4256008 :     fn add_blob(
     623      4256008 :         &mut self,
     624      4256008 :         key: Key,
     625      4256008 :         lsn: Lsn,
     626      4256008 :         start_offset: u64,
     627      4256008 :         end_offset: u64,
     628      4256008 :         is_last_blob_in_read: bool,
     629      4256008 :         will_init: bool,
     630      4256008 :     ) -> Option<VectoredRead> {
     631      4256008 :         match &mut self.read_builder {
     632      4174896 :             Some(read_builder) => {
     633      4174896 :                 let extended = read_builder.extend(
     634      4174896 :                     start_offset,
     635      4174896 :                     end_offset,
     636      4174896 :                     BlobMeta {
     637      4174896 :                         key,
     638      4174896 :                         lsn,
     639      4174896 :                         will_init,
     640      4174896 :                     },
     641      4174896 :                 );
     642      4174896 :                 assert_eq!(extended, VectoredReadExtended::Yes);
     643              :             }
     644        81112 :             None => {
     645        81112 :                 self.read_builder = {
     646        81112 :                     Some(ChunkedVectoredReadBuilder::new_streaming(
     647        81112 :                         start_offset,
     648        81112 :                         end_offset,
     649        81112 :                         BlobMeta {
     650        81112 :                             key,
     651        81112 :                             lsn,
     652        81112 :                             will_init,
     653        81112 :                         },
     654        81112 :                     ))
     655        81112 :                 };
     656        81112 :             }
     657              :         }
     658      4256008 :         let read_builder = self.read_builder.as_mut().unwrap();
     659      4256008 :         self.cnt += 1;
     660      4256008 :         if is_last_blob_in_read
     661      4254708 :             || read_builder.size() >= self.max_read_size as usize
     662      4197020 :             || self.cnt >= self.max_cnt
     663              :         {
     664        81112 :             let prev_read_builder = self.read_builder.take();
     665        81112 :             self.cnt = 0;
     666              : 
     667              :             // `current_read_builder` is None in the first iteration
     668        81112 :             if let Some(read_builder) = prev_read_builder {
     669        81112 :                 return Some(read_builder.build());
     670            0 :             }
     671      4174896 :         }
     672      4174896 :         None
     673      4256008 :     }
     674              : }
     675              : 
     676              : #[cfg(test)]
     677              : mod tests {
     678              :     use anyhow::Error;
     679              : 
     680              :     use crate::context::DownloadBehavior;
     681              :     use crate::page_cache::PAGE_SZ;
     682              :     use crate::task_mgr::TaskKind;
     683              : 
     684              :     use super::super::blob_io::tests::{random_array, write_maybe_compressed};
     685              :     use super::*;
     686              : 
     687           96 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     688              :         const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
     689           96 :         assert_eq!(read.start % ALIGN, 0);
     690           96 :         assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
     691              : 
     692          168 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     693           96 : 
     694           96 :         let offsets_in_read: Vec<_> = read
     695           96 :             .blobs_at
     696           96 :             .as_slice()
     697           96 :             .iter()
     698          168 :             .map(|(offset, _)| *offset)
     699           96 :             .collect();
     700           96 : 
     701           96 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     702           96 :     }
     703              : 
     704              :     #[test]
     705            4 :     fn planner_chunked_coalesce_all_test() {
     706              :         use crate::virtual_file;
     707              : 
     708              :         const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
     709              : 
     710            4 :         let max_read_size = CHUNK_SIZE as usize * 8;
     711            4 :         let key = Key::MIN;
     712            4 :         let lsn = Lsn(0);
     713            4 : 
     714            4 :         let blob_descriptions = [
     715            4 :             (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
     716            4 :             (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
     717            4 :             (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
     718            4 :             (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
     719            4 :             (key, lsn, CHUNK_SIZE, BlobFlag::None),
     720            4 :             (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
     721            4 :             (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
     722            4 :             (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
     723            4 :             (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
     724            4 :             (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
     725            4 :             (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
     726            4 :             (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
     727            4 :             (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
     728            4 :             (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
     729            4 :         ];
     730            4 : 
     731            4 :         let ranges = [
     732            4 :             &[
     733            4 :                 blob_descriptions[0],
     734            4 :                 blob_descriptions[2],
     735            4 :                 blob_descriptions[4],
     736            4 :                 blob_descriptions[5],
     737            4 :                 blob_descriptions[7],
     738            4 :                 blob_descriptions[8],
     739            4 :                 blob_descriptions[10],
     740            4 :             ],
     741            4 :             &blob_descriptions[11..12],
     742            4 :             &blob_descriptions[13..],
     743            4 :         ];
     744            4 : 
     745            4 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     746           60 :         for (key, lsn, offset, flag) in blob_descriptions {
     747           56 :             planner.handle(key, lsn, offset, flag);
     748           56 :         }
     749              : 
     750            4 :         planner.handle_range_end(652 * 1024);
     751            4 : 
     752            4 :         let reads = planner.finish();
     753            4 : 
     754            4 :         assert_eq!(reads.len(), ranges.len());
     755              : 
     756           12 :         for (idx, read) in reads.iter().enumerate() {
     757           12 :             validate_read(read, ranges[idx]);
     758           12 :         }
     759            4 :     }
     760              : 
     761              :     #[test]
     762            4 :     fn planner_max_read_size_test() {
     763            4 :         let max_read_size = 128 * 1024;
     764            4 :         let key = Key::MIN;
     765            4 :         let lsn = Lsn(0);
     766            4 : 
     767            4 :         let blob_descriptions = vec![
     768            4 :             (key, lsn, 0, BlobFlag::None),
     769            4 :             (key, lsn, 32 * 1024, BlobFlag::None),
     770            4 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     771            4 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     772            4 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     773            4 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     774            4 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     775            4 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     776            4 :         ];
     777            4 : 
     778            4 :         let ranges = [
     779            4 :             &blob_descriptions[0..3],
     780            4 :             &blob_descriptions[3..4],
     781            4 :             &blob_descriptions[4..5],
     782            4 :             &blob_descriptions[5..6],
     783            4 :             &blob_descriptions[6..7],
     784            4 :             &blob_descriptions[7..],
     785            4 :         ];
     786            4 : 
     787            4 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     788           32 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     789           32 :             planner.handle(key, lsn, offset, flag);
     790           32 :         }
     791              : 
     792            4 :         planner.handle_range_end(652 * 1024);
     793            4 : 
     794            4 :         let reads = planner.finish();
     795            4 : 
     796            4 :         assert_eq!(reads.len(), 6);
     797              : 
     798              :         // TODO: could remove zero reads to produce 5 reads here
     799              : 
     800           24 :         for (idx, read) in reads.iter().enumerate() {
     801           24 :             validate_read(read, ranges[idx]);
     802           24 :         }
     803            4 :     }
     804              : 
     805              :     #[test]
     806            4 :     fn planner_replacement_test() {
     807              :         const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
     808            4 :         let max_read_size = 128 * CHUNK_SIZE as usize;
     809            4 :         let first_key = Key::MIN;
     810            4 :         let second_key = first_key.next();
     811            4 :         let lsn = Lsn(0);
     812            4 : 
     813            4 :         let blob_descriptions = vec![
     814            4 :             (first_key, lsn, 0, BlobFlag::None),          // First in read 1
     815            4 :             (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
     816            4 :             (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
     817            4 :             (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
     818            4 :             (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
     819            4 :             (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None),       // Last in read 2
     820            4 :         ];
     821            4 : 
     822            4 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     823            4 : 
     824            4 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     825           24 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     826           24 :             planner.handle(key, lsn, offset, flag);
     827           24 :         }
     828              : 
     829            4 :         planner.handle_range_end(6 * CHUNK_SIZE);
     830            4 : 
     831            4 :         let reads = planner.finish();
     832            4 :         assert_eq!(reads.len(), 2);
     833              : 
     834            8 :         for (idx, read) in reads.iter().enumerate() {
     835            8 :             validate_read(read, ranges[idx]);
     836            8 :         }
     837            4 :     }
     838              : 
     839              :     #[test]
     840            4 :     fn streaming_planner_max_read_size_test() {
     841            4 :         let max_read_size = 128 * 1024;
     842            4 :         let key = Key::MIN;
     843            4 :         let lsn = Lsn(0);
     844            4 : 
     845            4 :         let blob_descriptions = vec![
     846            4 :             (key, lsn, 0, BlobFlag::None),
     847            4 :             (key, lsn, 32 * 1024, BlobFlag::None),
     848            4 :             (key, lsn, 96 * 1024, BlobFlag::None),
     849            4 :             (key, lsn, 128 * 1024, BlobFlag::None),
     850            4 :             (key, lsn, 198 * 1024, BlobFlag::None),
     851            4 :             (key, lsn, 268 * 1024, BlobFlag::None),
     852            4 :             (key, lsn, 396 * 1024, BlobFlag::None),
     853            4 :             (key, lsn, 652 * 1024, BlobFlag::None),
     854            4 :         ];
     855            4 : 
     856            4 :         let ranges = [
     857            4 :             &blob_descriptions[0..3],
     858            4 :             &blob_descriptions[3..5],
     859            4 :             &blob_descriptions[5..6],
     860            4 :             &blob_descriptions[6..7],
     861            4 :             &blob_descriptions[7..],
     862            4 :         ];
     863            4 : 
     864            4 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
     865            4 :         let mut reads = Vec::new();
     866           32 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     867           32 :             reads.extend(planner.handle(key, lsn, offset, false));
     868           32 :         }
     869            4 :         reads.extend(planner.handle_range_end(652 * 1024));
     870            4 : 
     871            4 :         assert_eq!(reads.len(), ranges.len());
     872              : 
     873           20 :         for (idx, read) in reads.iter().enumerate() {
     874           20 :             validate_read(read, ranges[idx]);
     875           20 :         }
     876            4 :     }
     877              : 
     878              :     #[test]
     879            4 :     fn streaming_planner_max_cnt_test() {
     880            4 :         let max_read_size = 1024 * 1024;
     881            4 :         let key = Key::MIN;
     882            4 :         let lsn = Lsn(0);
     883            4 : 
     884            4 :         let blob_descriptions = vec![
     885            4 :             (key, lsn, 0, BlobFlag::None),
     886            4 :             (key, lsn, 32 * 1024, BlobFlag::None),
     887            4 :             (key, lsn, 96 * 1024, BlobFlag::None),
     888            4 :             (key, lsn, 128 * 1024, BlobFlag::None),
     889            4 :             (key, lsn, 198 * 1024, BlobFlag::None),
     890            4 :             (key, lsn, 268 * 1024, BlobFlag::None),
     891            4 :             (key, lsn, 396 * 1024, BlobFlag::None),
     892            4 :             (key, lsn, 652 * 1024, BlobFlag::None),
     893            4 :         ];
     894            4 : 
     895            4 :         let ranges = [
     896            4 :             &blob_descriptions[0..2],
     897            4 :             &blob_descriptions[2..4],
     898            4 :             &blob_descriptions[4..6],
     899            4 :             &blob_descriptions[6..],
     900            4 :         ];
     901            4 : 
     902            4 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     903            4 :         let mut reads = Vec::new();
     904           32 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     905           32 :             reads.extend(planner.handle(key, lsn, offset, false));
     906           32 :         }
     907            4 :         reads.extend(planner.handle_range_end(652 * 1024));
     908            4 : 
     909            4 :         assert_eq!(reads.len(), ranges.len());
     910              : 
     911           16 :         for (idx, read) in reads.iter().enumerate() {
     912           16 :             validate_read(read, ranges[idx]);
     913           16 :         }
     914            4 :     }
     915              : 
     916              :     #[test]
     917            4 :     fn streaming_planner_edge_test() {
     918            4 :         let max_read_size = 1024 * 1024;
     919            4 :         let key = Key::MIN;
     920            4 :         let lsn = Lsn(0);
     921            4 :         {
     922            4 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     923            4 :             let mut reads = Vec::new();
     924            4 :             reads.extend(planner.handle_range_end(652 * 1024));
     925            4 :             assert!(reads.is_empty());
     926              :         }
     927              :         {
     928            4 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     929            4 :             let mut reads = Vec::new();
     930            4 :             reads.extend(planner.handle(key, lsn, 0, false));
     931            4 :             reads.extend(planner.handle_range_end(652 * 1024));
     932            4 :             assert_eq!(reads.len(), 1);
     933            4 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     934            4 :         }
     935            4 :         {
     936            4 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     937            4 :             let mut reads = Vec::new();
     938            4 :             reads.extend(planner.handle(key, lsn, 0, false));
     939            4 :             reads.extend(planner.handle(key, lsn, 128 * 1024, false));
     940            4 :             reads.extend(planner.handle_range_end(652 * 1024));
     941            4 :             assert_eq!(reads.len(), 2);
     942            4 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     943            4 :             validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
     944            4 :         }
     945            4 :         {
     946            4 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     947            4 :             let mut reads = Vec::new();
     948            4 :             reads.extend(planner.handle(key, lsn, 0, false));
     949            4 :             reads.extend(planner.handle(key, lsn, 128 * 1024, false));
     950            4 :             reads.extend(planner.handle_range_end(652 * 1024));
     951            4 :             assert_eq!(reads.len(), 1);
     952            4 :             validate_read(
     953            4 :                 &reads[0],
     954            4 :                 &[
     955            4 :                     (key, lsn, 0, BlobFlag::None),
     956            4 :                     (key, lsn, 128 * 1024, BlobFlag::None),
     957            4 :                 ],
     958            4 :             );
     959            4 :         }
     960            4 :     }
     961              : 
     962           16 :     async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
     963           16 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     964           16 :         let (_temp_dir, pathbuf, offsets) =
     965           16 :             write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
     966              : 
     967           16 :         let file = VirtualFile::open(&pathbuf, &ctx).await?;
     968           16 :         let file_len = std::fs::metadata(&pathbuf)?.len();
     969           16 : 
     970           16 :         // Multiply by two (compressed data might need more space), and add a few bytes for the header
     971         8240 :         let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
     972           16 :         let mut buf = IoBufferMut::with_capacity(reserved_bytes);
     973           16 : 
     974           16 :         let vectored_blob_reader = VectoredBlobReader::new(&file);
     975           16 :         let meta = BlobMeta {
     976           16 :             key: Key::MIN,
     977           16 :             lsn: Lsn(0),
     978           16 :             will_init: false,
     979           16 :         };
     980              : 
     981         8240 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     982         8240 :             let end = offsets.get(idx + 1).unwrap_or(&file_len);
     983         8240 :             if idx + 1 == offsets.len() {
     984           16 :                 continue;
     985         8224 :             }
     986         8224 :             let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
     987         8224 :             let read = read_builder.build();
     988         8224 :             let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
     989         8224 :             assert_eq!(result.blobs.len(), 1);
     990         8224 :             let read_blob = &result.blobs[0];
     991         8224 :             let view = BufView::new_slice(&result.buf);
     992         8224 :             let read_buf = read_blob.read(&view).await?;
     993         8224 :             assert_eq!(
     994         8224 :                 &blob[..],
     995         8224 :                 &read_buf[..],
     996            0 :                 "mismatch for idx={idx} at offset={offset}"
     997              :             );
     998         8224 :             buf = result.buf;
     999              :         }
    1000           16 :         Ok(())
    1001           16 :     }
    1002              : 
    1003              :     #[tokio::test]
    1004            4 :     async fn test_really_big_array() -> Result<(), Error> {
    1005            4 :         let blobs = &[
    1006            4 :             b"test".to_vec(),
    1007            4 :             random_array(10 * PAGE_SZ),
    1008            4 :             b"hello".to_vec(),
    1009            4 :             random_array(66 * PAGE_SZ),
    1010            4 :             vec![0xf3; 24 * PAGE_SZ],
    1011            4 :             b"foobar".to_vec(),
    1012            4 :         ];
    1013            4 :         round_trip_test_compressed(blobs, false).await?;
    1014            4 :         round_trip_test_compressed(blobs, true).await?;
    1015            4 :         Ok(())
    1016            4 :     }
    1017              : 
    1018              :     #[tokio::test]
    1019            4 :     async fn test_arrays_inc() -> Result<(), Error> {
    1020            4 :         let blobs = (0..PAGE_SZ / 8)
    1021         4096 :             .map(|v| random_array(v * 16))
    1022            4 :             .collect::<Vec<_>>();
    1023            4 :         round_trip_test_compressed(&blobs, false).await?;
    1024            4 :         round_trip_test_compressed(&blobs, true).await?;
    1025            4 :         Ok(())
    1026            4 :     }
    1027              : }
        

Generated by: LCOV version 2.1-beta