LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 93.3 % 358 334
Test Date: 2024-07-03 15:33:13 Functions: 87.0 % 92 80

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte. For blobs larger than 128 bits,
      10              : //! we also specify three reserved bits, only one of the three bit
      11              : //! patterns is currently in use (0b011) and signifies compression
      12              : //! with zstd.
      13              : //!
      14              : //! len <  128: 0XXXXXXX
      15              : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      16              : //!
      17              : use async_compression::Level;
      18              : use bytes::{BufMut, BytesMut};
      19              : use pageserver_api::models::ImageCompressionAlgorithm;
      20              : use tokio::io::AsyncWriteExt;
      21              : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
      22              : 
      23              : use crate::context::RequestContext;
      24              : use crate::page_cache::PAGE_SZ;
      25              : use crate::tenant::block_io::BlockCursor;
      26              : use crate::virtual_file::VirtualFile;
      27              : use std::cmp::min;
      28              : use std::io::{Error, ErrorKind};
      29              : 
      30              : impl<'a> BlockCursor<'a> {
      31              :     /// Read a blob into a new buffer.
      32      2598626 :     pub async fn read_blob(
      33      2598626 :         &self,
      34      2598626 :         offset: u64,
      35      2598626 :         ctx: &RequestContext,
      36      2598626 :     ) -> Result<Vec<u8>, std::io::Error> {
      37      2598626 :         let mut buf = Vec::new();
      38      2598626 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
      39      2598626 :         Ok(buf)
      40      2598626 :     }
      41              :     /// Read blob into the given buffer. Any previous contents in the buffer
      42              :     /// are overwritten.
      43      7106068 :     pub async fn read_blob_into_buf(
      44      7106068 :         &self,
      45      7106068 :         offset: u64,
      46      7106068 :         dstbuf: &mut Vec<u8>,
      47      7106068 :         ctx: &RequestContext,
      48      7106068 :     ) -> Result<(), std::io::Error> {
      49      7106068 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      50      7106068 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      51              : 
      52      7106068 :         let mut buf = self.read_blk(blknum, ctx).await?;
      53              : 
      54              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      55      7106068 :         let first_len_byte = buf[off];
      56      7106068 :         let len: usize = if first_len_byte < 0x80 {
      57              :             // 1-byte length header
      58      7091246 :             off += 1;
      59      7091246 :             first_len_byte as usize
      60              :         } else {
      61              :             // 4-byte length header
      62        14822 :             let mut len_buf = [0u8; 4];
      63        14822 :             let thislen = PAGE_SZ - off;
      64        14822 :             if thislen < 4 {
      65              :                 // it is split across two pages
      66            3 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      67            3 :                 blknum += 1;
      68            3 :                 buf = self.read_blk(blknum, ctx).await?;
      69            3 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      70            3 :                 off = 4 - thislen;
      71        14819 :             } else {
      72        14819 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      73        14819 :                 off += 4;
      74        14819 :             }
      75        14822 :             len_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
      76        14822 :             u32::from_be_bytes(len_buf) as usize
      77              :         };
      78      7106068 :         let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
      79      7106068 : 
      80      7106068 :         let mut tmp_buf = Vec::new();
      81              :         let buf_to_write;
      82      7106068 :         let compression = if compression_bits <= BYTE_UNCOMPRESSED {
      83      7106064 :             buf_to_write = dstbuf;
      84      7106064 :             None
      85            4 :         } else if compression_bits == BYTE_ZSTD {
      86            4 :             buf_to_write = &mut tmp_buf;
      87            4 :             Some(dstbuf)
      88              :         } else {
      89            0 :             let error = std::io::Error::new(
      90            0 :                 std::io::ErrorKind::InvalidData,
      91            0 :                 format!("invalid compression byte {compression_bits:x}"),
      92            0 :             );
      93            0 :             return Err(error);
      94              :         };
      95              : 
      96      7106068 :         buf_to_write.clear();
      97      7106068 :         buf_to_write.reserve(len);
      98      7106068 : 
      99      7106068 :         // Read the payload
     100      7106068 :         let mut remain = len;
     101     14295497 :         while remain > 0 {
     102      7189429 :             let mut page_remain = PAGE_SZ - off;
     103      7189429 :             if page_remain == 0 {
     104              :                 // continue on next page
     105        84040 :                 blknum += 1;
     106        84040 :                 buf = self.read_blk(blknum, ctx).await?;
     107        84040 :                 off = 0;
     108        84040 :                 page_remain = PAGE_SZ;
     109      7105389 :             }
     110      7189429 :             let this_blk_len = min(remain, page_remain);
     111      7189429 :             buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
     112      7189429 :             remain -= this_blk_len;
     113      7189429 :             off += this_blk_len;
     114              :         }
     115              : 
     116      7106068 :         if let Some(dstbuf) = compression {
     117            4 :             if compression_bits == BYTE_ZSTD {
     118            4 :                 let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
     119            4 :                 decoder.write_all(buf_to_write).await?;
     120            4 :                 decoder.flush().await?;
     121              :             } else {
     122            0 :                 unreachable!("already checked above")
     123              :             }
     124      7106064 :         }
     125              : 
     126      7106068 :         Ok(())
     127      7106068 :     }
     128              : }
     129              : 
     130              : /// Reserved bits for length and compression
     131              : const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
     132              : 
     133              : /// The maximum size of blobs we support. The highest few bits
     134              : /// are reserved for compression and other further uses.
     135              : const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
     136              : 
     137              : const BYTE_UNCOMPRESSED: u8 = 0x80;
     138              : const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
     139              : 
     140              : /// A wrapper of `VirtualFile` that allows users to write blobs.
     141              : ///
     142              : /// If a `BlobWriter` is dropped, the internal buffer will be
     143              : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
     144              : /// manually before dropping.
     145              : pub struct BlobWriter<const BUFFERED: bool> {
     146              :     inner: VirtualFile,
     147              :     offset: u64,
     148              :     /// A buffer to save on write calls, only used if BUFFERED=true
     149              :     buf: Vec<u8>,
     150              :     /// We do tiny writes for the length headers; they need to be in an owned buffer;
     151              :     io_buf: Option<BytesMut>,
     152              : }
     153              : 
     154              : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     155         1600 :     pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
     156         1600 :         Self {
     157         1600 :             inner,
     158         1600 :             offset: start_offset,
     159         1600 :             buf: Vec::with_capacity(Self::CAPACITY),
     160         1600 :             io_buf: Some(BytesMut::new()),
     161         1600 :         }
     162         1600 :     }
     163              : 
     164      2025538 :     pub fn size(&self) -> u64 {
     165      2025538 :         self.offset
     166      2025538 :     }
     167              : 
     168              :     const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
     169              : 
     170              :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     171              :     /// You need to make sure that the internal buffer is empty, otherwise
     172              :     /// data will be written in wrong order.
     173              :     #[inline(always)]
     174      1083960 :     async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     175      1083960 :         &mut self,
     176      1083960 :         src_buf: B,
     177      1083960 :         ctx: &RequestContext,
     178      1083960 :     ) -> (B::Buf, Result<(), Error>) {
     179      1083960 :         let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
     180      1083960 :         let nbytes = match res {
     181      1083960 :             Ok(nbytes) => nbytes,
     182            0 :             Err(e) => return (src_buf, Err(e)),
     183              :         };
     184      1083960 :         self.offset += nbytes as u64;
     185      1083960 :         (src_buf, Ok(()))
     186      1083960 :     }
     187              : 
     188              :     #[inline(always)]
     189              :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     190        10156 :     pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
     191        10156 :         let buf = std::mem::take(&mut self.buf);
     192        10156 :         let (mut buf, res) = self.inner.write_all(buf, ctx).await;
     193        10156 :         res?;
     194        10156 :         buf.clear();
     195        10156 :         self.buf = buf;
     196        10156 :         Ok(())
     197        10156 :     }
     198              : 
     199              :     #[inline(always)]
     200              :     /// Writes as much of `src_buf` into the internal buffer as it fits
     201     12922370 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     202     12922370 :         let remaining = Self::CAPACITY - self.buf.len();
     203     12922370 :         let to_copy = src_buf.len().min(remaining);
     204     12922370 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     205     12922370 :         self.offset += to_copy as u64;
     206     12922370 :         to_copy
     207     12922370 :     }
     208              : 
     209              :     /// Internal, possibly buffered, write function
     210     13997588 :     async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     211     13997588 :         &mut self,
     212     13997588 :         src_buf: B,
     213     13997588 :         ctx: &RequestContext,
     214     13997588 :     ) -> (B::Buf, Result<(), Error>) {
     215     13997588 :         if !BUFFERED {
     216      1083736 :             assert!(self.buf.is_empty());
     217      1083736 :             return self.write_all_unbuffered(src_buf, ctx).await;
     218     12913852 :         }
     219     12913852 :         let remaining = Self::CAPACITY - self.buf.len();
     220     12913852 :         let src_buf_len = src_buf.bytes_init();
     221     12913852 :         if src_buf_len == 0 {
     222           18 :             return (Slice::into_inner(src_buf.slice_full()), Ok(()));
     223     12913834 :         }
     224     12913834 :         let mut src_buf = src_buf.slice(0..src_buf_len);
     225     12913834 :         // First try to copy as much as we can into the buffer
     226     12913834 :         if remaining > 0 {
     227     12913834 :             let copied = self.write_into_buffer(&src_buf);
     228     12913834 :             src_buf = src_buf.slice(copied..);
     229     12913834 :         }
     230              :         // Then, if the buffer is full, flush it out
     231     12913834 :         if self.buf.len() == Self::CAPACITY {
     232         8800 :             if let Err(e) = self.flush_buffer(ctx).await {
     233            0 :                 return (Slice::into_inner(src_buf), Err(e));
     234         8800 :             }
     235     12905034 :         }
     236              :         // Finally, write the tail of src_buf:
     237              :         // If it wholly fits into the buffer without
     238              :         // completely filling it, then put it there.
     239              :         // If not, write it out directly.
     240     12913834 :         let src_buf = if !src_buf.is_empty() {
     241         8760 :             assert_eq!(self.buf.len(), 0);
     242         8760 :             if src_buf.len() < Self::CAPACITY {
     243         8536 :                 let copied = self.write_into_buffer(&src_buf);
     244         8536 :                 // We just verified above that src_buf fits into our internal buffer.
     245         8536 :                 assert_eq!(copied, src_buf.len());
     246         8536 :                 Slice::into_inner(src_buf)
     247              :             } else {
     248          224 :                 let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
     249          224 :                 if let Err(e) = res {
     250            0 :                     return (src_buf, Err(e));
     251          224 :                 }
     252          224 :                 src_buf
     253              :             }
     254              :         } else {
     255     12905074 :             Slice::into_inner(src_buf)
     256              :         };
     257     12913834 :         (src_buf, Ok(()))
     258     13997588 :     }
     259              : 
     260              :     /// Write a blob of data. Returns the offset that it was written to,
     261              :     /// which can be used to retrieve the data later.
     262       545994 :     pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     263       545994 :         &mut self,
     264       545994 :         srcbuf: B,
     265       545994 :         ctx: &RequestContext,
     266       545994 :     ) -> (B::Buf, Result<u64, Error>) {
     267       550802 :         self.write_blob_maybe_compressed(srcbuf, ctx, None).await
     268       545994 :     }
     269              : 
     270              :     /// Write a blob of data. Returns the offset that it was written to,
     271              :     /// which can be used to retrieve the data later.
     272      6998794 :     pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     273      6998794 :         &mut self,
     274      6998794 :         srcbuf: B,
     275      6998794 :         ctx: &RequestContext,
     276      6998794 :         algorithm: Option<ImageCompressionAlgorithm>,
     277      6998794 :     ) -> (B::Buf, Result<u64, Error>) {
     278      6998794 :         let offset = self.offset;
     279      6998794 : 
     280      6998794 :         let len = srcbuf.bytes_init();
     281      6998794 : 
     282      6998794 :         let mut io_buf = self.io_buf.take().expect("we always put it back below");
     283      6998794 :         io_buf.clear();
     284      6998794 :         let mut compressed_buf = None;
     285      6998794 :         let ((io_buf, hdr_res), srcbuf) = async {
     286      6998794 :             if len < 128 {
     287      6998794 :                 // Short blob. Write a 1-byte length header
     288      6998794 :                 io_buf.put_u8(len as u8);
     289      6986518 :                 (
     290      6986518 :                     self.write_all(io_buf, ctx).await,
     291      6998794 :                     srcbuf.slice_full().into_inner(),
     292      6998794 :                 )
     293      6998794 :             } else {
     294      6998794 :                 // Write a 4-byte length header
     295      6998794 :                 if len > MAX_SUPPORTED_LEN {
     296      6998794 :                     return (
     297            0 :                         (
     298            0 :                             io_buf,
     299            0 :                             Err(Error::new(
     300            0 :                                 ErrorKind::Other,
     301            0 :                                 format!("blob too large ({len} bytes)"),
     302            0 :                             )),
     303            0 :                         ),
     304            0 :                         srcbuf.slice_full().into_inner(),
     305            0 :                     );
     306      6998794 :                 }
     307      6998794 :                 let (high_bit_mask, len_written, srcbuf) = match algorithm {
     308      6998794 :                     Some(ImageCompressionAlgorithm::Zstd { level }) => {
     309      6998794 :                         let mut encoder = if let Some(level) = level {
     310      6998794 :                             async_compression::tokio::write::ZstdEncoder::with_quality(
     311           12 :                                 Vec::new(),
     312           12 :                                 Level::Precise(level.into()),
     313           12 :                             )
     314      6998794 :                         } else {
     315      6998794 :                             async_compression::tokio::write::ZstdEncoder::new(Vec::new())
     316      6998794 :                         };
     317      6998794 :                         let slice = srcbuf.slice_full();
     318           12 :                         encoder.write_all(&slice[..]).await.unwrap();
     319           12 :                         encoder.shutdown().await.unwrap();
     320           12 :                         let compressed = encoder.into_inner();
     321           12 :                         if compressed.len() < len {
     322      6998794 :                             let compressed_len = compressed.len();
     323            4 :                             compressed_buf = Some(compressed);
     324            4 :                             (BYTE_ZSTD, compressed_len, slice.into_inner())
     325      6998794 :                         } else {
     326      6998794 :                             (BYTE_UNCOMPRESSED, len, slice.into_inner())
     327      6998794 :                         }
     328      6998794 :                     }
     329      6998794 :                     None => (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()),
     330      6998794 :                 };
     331      6998794 :                 let mut len_buf = (len_written as u32).to_be_bytes();
     332        12276 :                 assert_eq!(len_buf[0] & 0xf0, 0);
     333      6998794 :                 len_buf[0] |= high_bit_mask;
     334        12276 :                 io_buf.extend_from_slice(&len_buf[..]);
     335        12276 :                 (self.write_all(io_buf, ctx).await, srcbuf)
     336      6998794 :             }
     337      6998794 :         }
     338       276066 :         .await;
     339      6998794 :         self.io_buf = Some(io_buf);
     340      6998794 :         match hdr_res {
     341      6998794 :             Ok(_) => (),
     342            0 :             Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
     343              :         }
     344      6998794 :         let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
     345            4 :             let (_buf, res) = self.write_all(compressed_buf, ctx).await;
     346            4 :             (Slice::into_inner(srcbuf.slice(..)), res)
     347              :         } else {
     348      6998790 :             self.write_all(srcbuf, ctx).await
     349              :         };
     350      6998794 :         (srcbuf, res.map(|_| offset))
     351      6998794 :     }
     352              : }
     353              : 
     354              : impl BlobWriter<true> {
     355              :     /// Access the underlying `VirtualFile`.
     356              :     ///
     357              :     /// This function flushes the internal buffer before giving access
     358              :     /// to the underlying `VirtualFile`.
     359         1328 :     pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
     360         1328 :         self.flush_buffer(ctx).await?;
     361         1328 :         Ok(self.inner)
     362         1328 :     }
     363              : 
     364              :     /// Access the underlying `VirtualFile`.
     365              :     ///
     366              :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     367              :     /// the internal buffer before giving access.
     368            0 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     369            0 :         self.inner
     370            0 :     }
     371              : }
     372              : 
     373              : impl BlobWriter<false> {
     374              :     /// Access the underlying `VirtualFile`.
     375          244 :     pub fn into_inner(self) -> VirtualFile {
     376          244 :         self.inner
     377          244 :     }
     378              : }
     379              : 
     380              : #[cfg(test)]
     381              : mod tests {
     382              :     use super::*;
     383              :     use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
     384              :     use rand::{Rng, SeedableRng};
     385              : 
     386           24 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     387        15054 :         round_trip_test_compressed::<BUFFERED, 0>(blobs).await
     388           24 :     }
     389              : 
     390           28 :     async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
     391           28 :         blobs: &[Vec<u8>],
     392           28 :     ) -> Result<(), Error> {
     393           28 :         let temp_dir = camino_tempfile::tempdir()?;
     394           28 :         let pathbuf = temp_dir.path().join("file");
     395           28 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     396           28 : 
     397           28 :         // Write part (in block to drop the file)
     398           28 :         let mut offsets = Vec::new();
     399              :         {
     400           28 :             let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
     401           28 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
     402         8272 :             for blob in blobs.iter() {
     403         8272 :                 let (_, res) = match COMPRESSION {
     404         8248 :                     0 => wtr.write_blob(blob.clone(), &ctx).await,
     405              :                     1 => {
     406           24 :                         wtr.write_blob_maybe_compressed(
     407           24 :                             blob.clone(),
     408           24 :                             &ctx,
     409           24 :                             Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
     410           24 :                         )
     411           15 :                         .await
     412              :                     }
     413            0 :                     _ => unreachable!("Invalid compression {COMPRESSION}"),
     414              :                 };
     415         8272 :                 let offs = res?;
     416         8272 :                 offsets.push(offs);
     417              :             }
     418              :             // Write out one page worth of zeros so that we can
     419              :             // read again with read_blk
     420           28 :             let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
     421           28 :             let offs = res?;
     422           28 :             println!("Writing final blob at offs={offs}");
     423           28 :             wtr.flush_buffer(&ctx).await?;
     424              :         }
     425              : 
     426           28 :         let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
     427           28 :         let rdr = BlockReaderRef::VirtualFile(&file);
     428           28 :         let rdr = BlockCursor::new(rdr);
     429         8272 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     430        10611 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     431         8272 :             assert_eq!(
     432         8272 :                 blob, &blob_read,
     433            0 :                 "mismatch for idx={idx} at offset={offset}"
     434              :             );
     435              :         }
     436           28 :         Ok(())
     437           28 :     }
     438              : 
     439         4106 :     fn random_array(len: usize) -> Vec<u8> {
     440         4106 :         let mut rng = rand::thread_rng();
     441     50145888 :         (0..len).map(|_| rng.gen()).collect::<_>()
     442         4106 :     }
     443              : 
     444              :     #[tokio::test]
     445            2 :     async fn test_one() -> Result<(), Error> {
     446            2 :         let blobs = &[vec![12, 21, 22]];
     447            8 :         round_trip_test::<false>(blobs).await?;
     448            4 :         round_trip_test::<true>(blobs).await?;
     449            2 :         Ok(())
     450            2 :     }
     451              : 
     452              :     #[tokio::test]
     453            2 :     async fn test_hello_simple() -> Result<(), Error> {
     454            2 :         let blobs = &[
     455            2 :             vec![0, 1, 2, 3],
     456            2 :             b"Hello, World!".to_vec(),
     457            2 :             Vec::new(),
     458            2 :             b"foobar".to_vec(),
     459            2 :         ];
     460           16 :         round_trip_test::<false>(blobs).await?;
     461            7 :         round_trip_test::<true>(blobs).await?;
     462            2 :         Ok(())
     463            2 :     }
     464              : 
     465              :     #[tokio::test]
     466            2 :     async fn test_really_big_array() -> Result<(), Error> {
     467            2 :         let blobs = &[
     468            2 :             b"test".to_vec(),
     469            2 :             random_array(10 * PAGE_SZ),
     470            2 :             b"hello".to_vec(),
     471            2 :             random_array(66 * PAGE_SZ),
     472            2 :             vec![0xf3; 24 * PAGE_SZ],
     473            2 :             b"foobar".to_vec(),
     474            2 :         ];
     475          124 :         round_trip_test::<false>(blobs).await?;
     476          116 :         round_trip_test::<true>(blobs).await?;
     477          100 :         round_trip_test_compressed::<false, 1>(blobs).await?;
     478           89 :         round_trip_test_compressed::<true, 1>(blobs).await?;
     479            2 :         Ok(())
     480            2 :     }
     481              : 
     482              :     #[tokio::test]
     483            2 :     async fn test_arrays_inc() -> Result<(), Error> {
     484            2 :         let blobs = (0..PAGE_SZ / 8)
     485         2048 :             .map(|v| random_array(v * 16))
     486            2 :             .collect::<Vec<_>>();
     487         4162 :         round_trip_test::<false>(&blobs).await?;
     488         2212 :         round_trip_test::<true>(&blobs).await?;
     489            2 :         Ok(())
     490            2 :     }
     491              : 
     492              :     #[tokio::test]
     493            2 :     async fn test_arrays_random_size() -> Result<(), Error> {
     494            2 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     495            2 :         let blobs = (0..1024)
     496         2048 :             .map(|_| {
     497         2048 :                 let mut sz: u16 = rng.gen();
     498         2048 :                 // Make 50% of the arrays small
     499         2048 :                 if rng.gen() {
     500         1032 :                     sz &= 63;
     501         1032 :                 }
     502         2048 :                 random_array(sz.into())
     503         2048 :             })
     504            2 :             .collect::<Vec<_>>();
     505         5106 :         round_trip_test::<false>(&blobs).await?;
     506         3279 :         round_trip_test::<true>(&blobs).await?;
     507            2 :         Ok(())
     508            2 :     }
     509              : 
     510              :     #[tokio::test]
     511            2 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     512            2 :         let blobs = &[
     513            2 :             random_array(PAGE_SZ - 4),
     514            2 :             random_array(PAGE_SZ - 4),
     515            2 :             random_array(PAGE_SZ - 4),
     516            2 :         ];
     517           14 :         round_trip_test::<false>(blobs).await?;
     518            6 :         round_trip_test::<true>(blobs).await?;
     519            2 :         Ok(())
     520            2 :     }
     521              : }
        

Generated by: LCOV version 2.1-beta