LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 92.7 % 386 358
Test Date: 2025-03-12 16:10:49 Functions: 91.3 % 103 94

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte. For blobs larger than 128 bits,
      10              : //! we also specify three reserved bits, only one of the three bit
      11              : //! patterns is currently in use (0b011) and signifies compression
      12              : //! with zstd.
      13              : //!
      14              : //! len <  128: 0XXXXXXX
      15              : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      16              : //!
      17              : use std::cmp::min;
      18              : use std::io::{Error, ErrorKind};
      19              : 
      20              : use async_compression::Level;
      21              : use bytes::{BufMut, BytesMut};
      22              : use pageserver_api::models::ImageCompressionAlgorithm;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
      25              : use tracing::warn;
      26              : 
      27              : use crate::context::RequestContext;
      28              : use crate::page_cache::PAGE_SZ;
      29              : use crate::tenant::block_io::BlockCursor;
      30              : use crate::virtual_file::VirtualFile;
      31              : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
      32              : 
      33              : #[derive(Copy, Clone, Debug)]
      34              : pub struct CompressionInfo {
      35              :     pub written_compressed: bool,
      36              :     pub compressed_size: Option<usize>,
      37              : }
      38              : 
      39              : impl BlockCursor<'_> {
      40              :     /// Read a blob into a new buffer.
      41        16592 :     pub async fn read_blob(
      42        16592 :         &self,
      43        16592 :         offset: u64,
      44        16592 :         ctx: &RequestContext,
      45        16592 :     ) -> Result<Vec<u8>, std::io::Error> {
      46        16592 :         let mut buf = Vec::new();
      47        16592 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
      48        16592 :         Ok(buf)
      49        16592 :     }
      50              :     /// Read blob into the given buffer. Any previous contents in the buffer
      51              :     /// are overwritten.
      52        16720 :     pub async fn read_blob_into_buf(
      53        16720 :         &self,
      54        16720 :         offset: u64,
      55        16720 :         dstbuf: &mut Vec<u8>,
      56        16720 :         ctx: &RequestContext,
      57        16720 :     ) -> Result<(), std::io::Error> {
      58        16720 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      59        16720 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      60              : 
      61        16720 :         let mut buf = self.read_blk(blknum, ctx).await?;
      62              : 
      63              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      64        16720 :         let first_len_byte = buf[off];
      65        16720 :         let len: usize = if first_len_byte < 0x80 {
      66              :             // 1-byte length header
      67         4448 :             off += 1;
      68         4448 :             first_len_byte as usize
      69              :         } else {
      70              :             // 4-byte length header
      71        12272 :             let mut len_buf = [0u8; 4];
      72        12272 :             let thislen = PAGE_SZ - off;
      73        12272 :             if thislen < 4 {
      74              :                 // it is split across two pages
      75            0 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      76            0 :                 blknum += 1;
      77            0 :                 buf = self.read_blk(blknum, ctx).await?;
      78            0 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      79            0 :                 off = 4 - thislen;
      80        12272 :             } else {
      81        12272 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      82        12272 :                 off += 4;
      83        12272 :             }
      84        12272 :             let bit_mask = if self.read_compressed {
      85           40 :                 !LEN_COMPRESSION_BIT_MASK
      86              :             } else {
      87        12232 :                 0x7f
      88              :             };
      89        12272 :             len_buf[0] &= bit_mask;
      90        12272 :             u32::from_be_bytes(len_buf) as usize
      91              :         };
      92        16720 :         let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
      93        16720 : 
      94        16720 :         let mut tmp_buf = Vec::new();
      95              :         let buf_to_write;
      96        16720 :         let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
      97        16712 :             if compression_bits > BYTE_UNCOMPRESSED {
      98            0 :                 warn!("reading key above future limit ({len} bytes)");
      99        16712 :             }
     100        16712 :             buf_to_write = dstbuf;
     101        16712 :             None
     102            8 :         } else if compression_bits == BYTE_ZSTD {
     103            8 :             buf_to_write = &mut tmp_buf;
     104            8 :             Some(dstbuf)
     105              :         } else {
     106            0 :             let error = std::io::Error::new(
     107            0 :                 std::io::ErrorKind::InvalidData,
     108            0 :                 format!("invalid compression byte {compression_bits:x}"),
     109            0 :             );
     110            0 :             return Err(error);
     111              :         };
     112              : 
     113        16720 :         buf_to_write.clear();
     114        16720 :         buf_to_write.reserve(len);
     115        16720 : 
     116        16720 :         // Read the payload
     117        16720 :         let mut remain = len;
     118        58872 :         while remain > 0 {
     119        42152 :             let mut page_remain = PAGE_SZ - off;
     120        42152 :             if page_remain == 0 {
     121              :                 // continue on next page
     122        25512 :                 blknum += 1;
     123        25512 :                 buf = self.read_blk(blknum, ctx).await?;
     124        25512 :                 off = 0;
     125        25512 :                 page_remain = PAGE_SZ;
     126        16640 :             }
     127        42152 :             let this_blk_len = min(remain, page_remain);
     128        42152 :             buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
     129        42152 :             remain -= this_blk_len;
     130        42152 :             off += this_blk_len;
     131              :         }
     132              : 
     133        16720 :         if let Some(dstbuf) = compression {
     134            8 :             if compression_bits == BYTE_ZSTD {
     135            8 :                 let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
     136            8 :                 decoder.write_all(buf_to_write).await?;
     137            8 :                 decoder.flush().await?;
     138              :             } else {
     139            0 :                 unreachable!("already checked above")
     140              :             }
     141        16712 :         }
     142              : 
     143        16720 :         Ok(())
     144        16720 :     }
     145              : }
     146              : 
     147              : /// Reserved bits for length and compression
     148              : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
     149              : 
     150              : /// The maximum size of blobs we support. The highest few bits
     151              : /// are reserved for compression and other further uses.
     152              : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
     153              : 
     154              : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
     155              : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
     156              : 
     157              : /// A wrapper of `VirtualFile` that allows users to write blobs.
     158              : ///
     159              : /// If a `BlobWriter` is dropped, the internal buffer will be
     160              : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
     161              : /// manually before dropping.
     162              : pub struct BlobWriter<const BUFFERED: bool> {
     163              :     inner: VirtualFile,
     164              :     offset: u64,
     165              :     /// A buffer to save on write calls, only used if BUFFERED=true
     166              :     buf: Vec<u8>,
     167              :     /// We do tiny writes for the length headers; they need to be in an owned buffer;
     168              :     io_buf: Option<BytesMut>,
     169              : }
     170              : 
     171              : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     172         4220 :     pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
     173         4220 :         Self {
     174         4220 :             inner,
     175         4220 :             offset: start_offset,
     176         4220 :             buf: Vec::with_capacity(Self::CAPACITY),
     177         4220 :             io_buf: Some(BytesMut::new()),
     178         4220 :         }
     179         4220 :     }
     180              : 
     181      4099652 :     pub fn size(&self) -> u64 {
     182      4099652 :         self.offset
     183      4099652 :     }
     184              : 
     185              :     const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
     186              : 
     187              :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     188              :     /// You need to make sure that the internal buffer is empty, otherwise
     189              :     /// data will be written in wrong order.
     190              :     #[inline(always)]
     191      2203844 :     async fn write_all_unbuffered<Buf: IoBuf + Send>(
     192      2203844 :         &mut self,
     193      2203844 :         src_buf: FullSlice<Buf>,
     194      2203844 :         ctx: &RequestContext,
     195      2203844 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     196      2203844 :         let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
     197      2203844 :         let nbytes = match res {
     198      2203844 :             Ok(nbytes) => nbytes,
     199            0 :             Err(e) => return (src_buf, Err(e)),
     200              :         };
     201      2203844 :         self.offset += nbytes as u64;
     202      2203844 :         (src_buf, Ok(()))
     203      2203844 :     }
     204              : 
     205              :     #[inline(always)]
     206              :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     207        24592 :     pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
     208        24592 :         let buf = std::mem::take(&mut self.buf);
     209        24592 :         let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
     210        24592 :         res?;
     211        24592 :         let mut buf = slice.into_raw_slice().into_inner();
     212        24592 :         buf.clear();
     213        24592 :         self.buf = buf;
     214        24592 :         Ok(())
     215        24592 :     }
     216              : 
     217              :     #[inline(always)]
     218              :     /// Writes as much of `src_buf` into the internal buffer as it fits
     219     25939688 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     220     25939688 :         let remaining = Self::CAPACITY - self.buf.len();
     221     25939688 :         let to_copy = src_buf.len().min(remaining);
     222     25939688 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     223     25939688 :         self.offset += to_copy as u64;
     224     25939688 :         to_copy
     225     25939688 :     }
     226              : 
     227              :     /// Internal, possibly buffered, write function
     228     28122032 :     async fn write_all<Buf: IoBuf + Send>(
     229     28122032 :         &mut self,
     230     28122032 :         src_buf: FullSlice<Buf>,
     231     28122032 :         ctx: &RequestContext,
     232     28122032 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     233     28122032 :         let src_buf = src_buf.into_raw_slice();
     234     28122032 :         let src_buf_bounds = src_buf.bounds();
     235     28122032 :         let restore = move |src_buf_slice: Slice<_>| {
     236     25918188 :             FullSlice::must_new(Slice::from_buf_bounds(
     237     25918188 :                 src_buf_slice.into_inner(),
     238     25918188 :                 src_buf_bounds,
     239     25918188 :             ))
     240     25918188 :         };
     241              : 
     242     28122032 :         if !BUFFERED {
     243      2203384 :             assert!(self.buf.is_empty());
     244      2203384 :             return self
     245      2203384 :                 .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
     246      2203384 :                 .await;
     247     25918648 :         }
     248     25918648 :         let remaining = Self::CAPACITY - self.buf.len();
     249     25918648 :         let src_buf_len = src_buf.bytes_init();
     250     25918648 :         if src_buf_len == 0 {
     251           48 :             return (restore(src_buf), Ok(()));
     252     25918600 :         }
     253     25918600 :         let mut src_buf = src_buf.slice(0..src_buf_len);
     254     25918600 :         // First try to copy as much as we can into the buffer
     255     25918600 :         if remaining > 0 {
     256     25918600 :             let copied = self.write_into_buffer(&src_buf);
     257     25918600 :             src_buf = src_buf.slice(copied..);
     258     25918600 :         }
     259              :         // Then, if the buffer is full, flush it out
     260     25918600 :         if self.buf.len() == Self::CAPACITY {
     261        21628 :             if let Err(e) = self.flush_buffer(ctx).await {
     262            0 :                 return (restore(src_buf), Err(e));
     263        21628 :             }
     264     25896972 :         }
     265              :         // Finally, write the tail of src_buf:
     266              :         // If it wholly fits into the buffer without
     267              :         // completely filling it, then put it there.
     268              :         // If not, write it out directly.
     269     25918600 :         let src_buf = if !src_buf.is_empty() {
     270        21548 :             assert_eq!(self.buf.len(), 0);
     271        21548 :             if src_buf.len() < Self::CAPACITY {
     272        21088 :                 let copied = self.write_into_buffer(&src_buf);
     273        21088 :                 // We just verified above that src_buf fits into our internal buffer.
     274        21088 :                 assert_eq!(copied, src_buf.len());
     275        21088 :                 restore(src_buf)
     276              :             } else {
     277          460 :                 let (src_buf, res) = self
     278          460 :                     .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
     279          460 :                     .await;
     280          460 :                 if let Err(e) = res {
     281            0 :                     return (src_buf, Err(e));
     282          460 :                 }
     283          460 :                 src_buf
     284              :             }
     285              :         } else {
     286     25897052 :             restore(src_buf)
     287              :         };
     288     25918600 :         (src_buf, Ok(()))
     289     28122032 :     }
     290              : 
     291              :     /// Write a blob of data. Returns the offset that it was written to,
     292              :     /// which can be used to retrieve the data later.
     293        20696 :     pub async fn write_blob<Buf: IoBuf + Send>(
     294        20696 :         &mut self,
     295        20696 :         srcbuf: FullSlice<Buf>,
     296        20696 :         ctx: &RequestContext,
     297        20696 :     ) -> (FullSlice<Buf>, Result<u64, Error>) {
     298        20696 :         let (buf, res) = self
     299        20696 :             .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
     300        20696 :             .await;
     301        20696 :         (buf, res.map(|(off, _compression_info)| off))
     302        20696 :     }
     303              : 
     304              :     /// Write a blob of data. Returns the offset that it was written to,
     305              :     /// which can be used to retrieve the data later.
     306     14061016 :     pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
     307     14061016 :         &mut self,
     308     14061016 :         srcbuf: FullSlice<Buf>,
     309     14061016 :         ctx: &RequestContext,
     310     14061016 :         algorithm: ImageCompressionAlgorithm,
     311     14061016 :     ) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
     312     14061016 :         let offset = self.offset;
     313     14061016 :         let mut compression_info = CompressionInfo {
     314     14061016 :             written_compressed: false,
     315     14061016 :             compressed_size: None,
     316     14061016 :         };
     317     14061016 : 
     318     14061016 :         let len = srcbuf.len();
     319     14061016 : 
     320     14061016 :         let mut io_buf = self.io_buf.take().expect("we always put it back below");
     321     14061016 :         io_buf.clear();
     322     14061016 :         let mut compressed_buf = None;
     323     14061016 :         let ((io_buf_slice, hdr_res), srcbuf) = async {
     324     14061016 :             if len < 128 {
     325              :                 // Short blob. Write a 1-byte length header
     326     13988280 :                 io_buf.put_u8(len as u8);
     327     13988280 :                 (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
     328              :             } else {
     329              :                 // Write a 4-byte length header
     330        72736 :                 if len > MAX_SUPPORTED_BLOB_LEN {
     331            0 :                     return (
     332            0 :                         (
     333            0 :                             io_buf.slice_len(),
     334            0 :                             Err(Error::new(
     335            0 :                                 ErrorKind::Other,
     336            0 :                                 format!("blob too large ({len} bytes)"),
     337            0 :                             )),
     338            0 :                         ),
     339            0 :                         srcbuf,
     340            0 :                     );
     341        72736 :                 }
     342        72736 :                 let (high_bit_mask, len_written, srcbuf) = match algorithm {
     343        20104 :                     ImageCompressionAlgorithm::Zstd { level } => {
     344        20104 :                         let mut encoder = if let Some(level) = level {
     345        20104 :                             async_compression::tokio::write::ZstdEncoder::with_quality(
     346        20104 :                                 Vec::new(),
     347        20104 :                                 Level::Precise(level.into()),
     348        20104 :                             )
     349              :                         } else {
     350            0 :                             async_compression::tokio::write::ZstdEncoder::new(Vec::new())
     351              :                         };
     352        20104 :                         encoder.write_all(&srcbuf[..]).await.unwrap();
     353        20104 :                         encoder.shutdown().await.unwrap();
     354        20104 :                         let compressed = encoder.into_inner();
     355        20104 :                         compression_info.compressed_size = Some(compressed.len());
     356        20104 :                         if compressed.len() < len {
     357           12 :                             compression_info.written_compressed = true;
     358           12 :                             let compressed_len = compressed.len();
     359           12 :                             compressed_buf = Some(compressed);
     360           12 :                             (BYTE_ZSTD, compressed_len, srcbuf)
     361              :                         } else {
     362        20092 :                             (BYTE_UNCOMPRESSED, len, srcbuf)
     363              :                         }
     364              :                     }
     365        52632 :                     ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
     366              :                 };
     367        72736 :                 let mut len_buf = (len_written as u32).to_be_bytes();
     368        72736 :                 assert_eq!(len_buf[0] & 0xf0, 0);
     369        72736 :                 len_buf[0] |= high_bit_mask;
     370        72736 :                 io_buf.extend_from_slice(&len_buf[..]);
     371        72736 :                 (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
     372              :             }
     373     14061016 :         }
     374     14061016 :         .await;
     375     14061016 :         self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
     376     14061016 :         match hdr_res {
     377     14061016 :             Ok(_) => (),
     378            0 :             Err(e) => return (srcbuf, Err(e)),
     379              :         }
     380     14061016 :         let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
     381           12 :             let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
     382           12 :             (srcbuf, res)
     383              :         } else {
     384     14061004 :             self.write_all(srcbuf, ctx).await
     385              :         };
     386     14061016 :         (srcbuf, res.map(|_| (offset, compression_info)))
     387     14061016 :     }
     388              : }
     389              : 
     390              : impl BlobWriter<true> {
     391              :     /// Access the underlying `VirtualFile`.
     392              :     ///
     393              :     /// This function flushes the internal buffer before giving access
     394              :     /// to the underlying `VirtualFile`.
     395         2884 :     pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
     396         2884 :         self.flush_buffer(ctx).await?;
     397         2884 :         Ok(self.inner)
     398         2884 :     }
     399              : 
     400              :     /// Access the underlying `VirtualFile`.
     401              :     ///
     402              :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     403              :     /// the internal buffer before giving access.
     404           52 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     405           52 :         self.inner
     406           52 :     }
     407              : }
     408              : 
     409              : impl BlobWriter<false> {
     410              :     /// Access the underlying `VirtualFile`.
     411         1204 :     pub fn into_inner(self) -> VirtualFile {
     412         1204 :         self.inner
     413         1204 :     }
     414              : }
     415              : 
     416              : #[cfg(test)]
     417              : pub(crate) mod tests {
     418              :     use camino::Utf8PathBuf;
     419              :     use camino_tempfile::Utf8TempDir;
     420              :     use rand::{Rng, SeedableRng};
     421              : 
     422              :     use super::*;
     423              :     use crate::context::DownloadBehavior;
     424              :     use crate::task_mgr::TaskKind;
     425              :     use crate::tenant::block_io::BlockReaderRef;
     426              : 
     427           48 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     428           48 :         round_trip_test_compressed::<BUFFERED>(blobs, false).await
     429           48 :     }
     430              : 
     431           80 :     pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
     432           80 :         blobs: &[Vec<u8>],
     433           80 :         compression: bool,
     434           80 :         ctx: &RequestContext,
     435           80 :     ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
     436           80 :         let temp_dir = camino_tempfile::tempdir()?;
     437           80 :         let pathbuf = temp_dir.path().join("file");
     438           80 : 
     439           80 :         // Write part (in block to drop the file)
     440           80 :         let mut offsets = Vec::new();
     441              :         {
     442           80 :             let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
     443           80 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
     444        24816 :             for blob in blobs.iter() {
     445        24816 :                 let (_, res) = if compression {
     446         4200 :                     let res = wtr
     447         4200 :                         .write_blob_maybe_compressed(
     448         4200 :                             blob.clone().slice_len(),
     449         4200 :                             ctx,
     450         4200 :                             ImageCompressionAlgorithm::Zstd { level: Some(1) },
     451         4200 :                         )
     452         4200 :                         .await;
     453         4200 :                     (res.0, res.1.map(|(off, _)| off))
     454              :                 } else {
     455        20616 :                     wtr.write_blob(blob.clone().slice_len(), ctx).await
     456              :                 };
     457        24816 :                 let offs = res?;
     458        24816 :                 offsets.push(offs);
     459              :             }
     460              :             // Write out one page worth of zeros so that we can
     461              :             // read again with read_blk
     462           80 :             let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
     463           80 :             let offs = res?;
     464           80 :             println!("Writing final blob at offs={offs}");
     465           80 :             wtr.flush_buffer(ctx).await?;
     466              :         }
     467           80 :         Ok((temp_dir, pathbuf, offsets))
     468           80 :     }
     469              : 
     470           64 :     async fn round_trip_test_compressed<const BUFFERED: bool>(
     471           64 :         blobs: &[Vec<u8>],
     472           64 :         compression: bool,
     473           64 :     ) -> Result<(), Error> {
     474           64 :         let ctx =
     475           64 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     476           64 :         let (_temp_dir, pathbuf, offsets) =
     477           64 :             write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
     478              : 
     479           64 :         let file = VirtualFile::open(pathbuf, &ctx).await?;
     480           64 :         let rdr = BlockReaderRef::VirtualFile(&file);
     481           64 :         let rdr = BlockCursor::new_with_compression(rdr, compression);
     482        16576 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     483        16576 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     484        16576 :             assert_eq!(
     485        16576 :                 blob, &blob_read,
     486            0 :                 "mismatch for idx={idx} at offset={offset}"
     487              :             );
     488              :         }
     489           64 :         Ok(())
     490           64 :     }
     491              : 
     492        12316 :     pub(crate) fn random_array(len: usize) -> Vec<u8> {
     493        12316 :         let mut rng = rand::thread_rng();
     494    136303808 :         (0..len).map(|_| rng.r#gen()).collect::<_>()
     495        12316 :     }
     496              : 
     497              :     #[tokio::test]
     498            4 :     async fn test_one() -> Result<(), Error> {
     499            4 :         let blobs = &[vec![12, 21, 22]];
     500            4 :         round_trip_test::<false>(blobs).await?;
     501            4 :         round_trip_test::<true>(blobs).await?;
     502            4 :         Ok(())
     503            4 :     }
     504              : 
     505              :     #[tokio::test]
     506            4 :     async fn test_hello_simple() -> Result<(), Error> {
     507            4 :         let blobs = &[
     508            4 :             vec![0, 1, 2, 3],
     509            4 :             b"Hello, World!".to_vec(),
     510            4 :             Vec::new(),
     511            4 :             b"foobar".to_vec(),
     512            4 :         ];
     513            4 :         round_trip_test::<false>(blobs).await?;
     514            4 :         round_trip_test::<true>(blobs).await?;
     515            4 :         round_trip_test_compressed::<false>(blobs, true).await?;
     516            4 :         round_trip_test_compressed::<true>(blobs, true).await?;
     517            4 :         Ok(())
     518            4 :     }
     519              : 
     520              :     #[tokio::test]
     521            4 :     async fn test_really_big_array() -> Result<(), Error> {
     522            4 :         let blobs = &[
     523            4 :             b"test".to_vec(),
     524            4 :             random_array(10 * PAGE_SZ),
     525            4 :             b"hello".to_vec(),
     526            4 :             random_array(66 * PAGE_SZ),
     527            4 :             vec![0xf3; 24 * PAGE_SZ],
     528            4 :             b"foobar".to_vec(),
     529            4 :         ];
     530            4 :         round_trip_test::<false>(blobs).await?;
     531            4 :         round_trip_test::<true>(blobs).await?;
     532            4 :         round_trip_test_compressed::<false>(blobs, true).await?;
     533            4 :         round_trip_test_compressed::<true>(blobs, true).await?;
     534            4 :         Ok(())
     535            4 :     }
     536              : 
     537              :     #[tokio::test]
     538            4 :     async fn test_arrays_inc() -> Result<(), Error> {
     539            4 :         let blobs = (0..PAGE_SZ / 8)
     540         4096 :             .map(|v| random_array(v * 16))
     541            4 :             .collect::<Vec<_>>();
     542            4 :         round_trip_test::<false>(&blobs).await?;
     543            4 :         round_trip_test::<true>(&blobs).await?;
     544            4 :         Ok(())
     545            4 :     }
     546              : 
     547              :     #[tokio::test]
     548            4 :     async fn test_arrays_random_size() -> Result<(), Error> {
     549            4 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     550            4 :         let blobs = (0..1024)
     551         4096 :             .map(|_| {
     552         4096 :                 let mut sz: u16 = rng.r#gen();
     553         4096 :                 // Make 50% of the arrays small
     554         4096 :                 if rng.r#gen() {
     555         2064 :                     sz &= 63;
     556         2064 :                 }
     557         4096 :                 random_array(sz.into())
     558         4096 :             })
     559            4 :             .collect::<Vec<_>>();
     560            4 :         round_trip_test::<false>(&blobs).await?;
     561            4 :         round_trip_test::<true>(&blobs).await?;
     562            4 :         Ok(())
     563            4 :     }
     564              : 
     565              :     #[tokio::test]
     566            4 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     567            4 :         let blobs = &[
     568            4 :             random_array(PAGE_SZ - 4),
     569            4 :             random_array(PAGE_SZ - 4),
     570            4 :             random_array(PAGE_SZ - 4),
     571            4 :         ];
     572            4 :         round_trip_test::<false>(blobs).await?;
     573            4 :         round_trip_test::<true>(blobs).await?;
     574            4 :         Ok(())
     575            4 :     }
     576              : }
        

Generated by: LCOV version 2.1-beta