LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 93.7 % 378 354
Test Date: 2024-07-21 16:16:09 Functions: 84.8 % 92 78

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte. For blobs larger than 128 bits,
      10              : //! we also specify three reserved bits, only one of the three bit
      11              : //! patterns is currently in use (0b011) and signifies compression
      12              : //! with zstd.
      13              : //!
      14              : //! len <  128: 0XXXXXXX
      15              : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      16              : //!
      17              : use async_compression::Level;
      18              : use bytes::{BufMut, BytesMut};
      19              : use pageserver_api::models::ImageCompressionAlgorithm;
      20              : use tokio::io::AsyncWriteExt;
      21              : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
      22              : use tracing::warn;
      23              : 
      24              : use crate::context::RequestContext;
      25              : use crate::page_cache::PAGE_SZ;
      26              : use crate::tenant::block_io::BlockCursor;
      27              : use crate::virtual_file::VirtualFile;
      28              : use std::cmp::min;
      29              : use std::io::{Error, ErrorKind};
      30              : 
      31              : impl<'a> BlockCursor<'a> {
      32              :     /// Read a blob into a new buffer.
      33      2598546 :     pub async fn read_blob(
      34      2598546 :         &self,
      35      2598546 :         offset: u64,
      36      2598546 :         ctx: &RequestContext,
      37      2598546 :     ) -> Result<Vec<u8>, std::io::Error> {
      38      2598546 :         let mut buf = Vec::new();
      39      2598546 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
      40      2598546 :         Ok(buf)
      41      2598546 :     }
      42              :     /// Read blob into the given buffer. Any previous contents in the buffer
      43              :     /// are overwritten.
      44      7106080 :     pub async fn read_blob_into_buf(
      45      7106080 :         &self,
      46      7106080 :         offset: u64,
      47      7106080 :         dstbuf: &mut Vec<u8>,
      48      7106080 :         ctx: &RequestContext,
      49      7106080 :     ) -> Result<(), std::io::Error> {
      50      7106080 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      51      7106080 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      52              : 
      53      7106080 :         let mut buf = self.read_blk(blknum, ctx).await?;
      54              : 
      55              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      56      7106080 :         let first_len_byte = buf[off];
      57      7106080 :         let len: usize = if first_len_byte < 0x80 {
      58              :             // 1-byte length header
      59      7091258 :             off += 1;
      60      7091258 :             first_len_byte as usize
      61              :         } else {
      62              :             // 4-byte length header
      63        14822 :             let mut len_buf = [0u8; 4];
      64        14822 :             let thislen = PAGE_SZ - off;
      65        14822 :             if thislen < 4 {
      66              :                 // it is split across two pages
      67            1 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      68            1 :                 blknum += 1;
      69            1 :                 buf = self.read_blk(blknum, ctx).await?;
      70            1 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      71            1 :                 off = 4 - thislen;
      72        14821 :             } else {
      73        14821 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      74        14821 :                 off += 4;
      75        14821 :             }
      76        14822 :             let bit_mask = if self.read_compressed {
      77           22 :                 !LEN_COMPRESSION_BIT_MASK
      78              :             } else {
      79        14800 :                 0x7f
      80              :             };
      81        14822 :             len_buf[0] &= bit_mask;
      82        14822 :             u32::from_be_bytes(len_buf) as usize
      83              :         };
      84      7106080 :         let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
      85      7106080 : 
      86      7106080 :         let mut tmp_buf = Vec::new();
      87              :         let buf_to_write;
      88      7106080 :         let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
      89      7106076 :             if compression_bits > BYTE_UNCOMPRESSED {
      90            0 :                 warn!("reading key above future limit ({len} bytes)");
      91      7106076 :             }
      92      7106076 :             buf_to_write = dstbuf;
      93      7106076 :             None
      94            4 :         } else if compression_bits == BYTE_ZSTD {
      95            4 :             buf_to_write = &mut tmp_buf;
      96            4 :             Some(dstbuf)
      97              :         } else {
      98            0 :             let error = std::io::Error::new(
      99            0 :                 std::io::ErrorKind::InvalidData,
     100            0 :                 format!("invalid compression byte {compression_bits:x}"),
     101            0 :             );
     102            0 :             return Err(error);
     103              :         };
     104              : 
     105      7106080 :         buf_to_write.clear();
     106      7106080 :         buf_to_write.reserve(len);
     107      7106080 : 
     108      7106080 :         // Read the payload
     109      7106080 :         let mut remain = len;
     110     14295618 :         while remain > 0 {
     111      7189538 :             let mut page_remain = PAGE_SZ - off;
     112      7189538 :             if page_remain == 0 {
     113              :                 // continue on next page
     114        84139 :                 blknum += 1;
     115        84139 :                 buf = self.read_blk(blknum, ctx).await?;
     116        84139 :                 off = 0;
     117        84139 :                 page_remain = PAGE_SZ;
     118      7105399 :             }
     119      7189538 :             let this_blk_len = min(remain, page_remain);
     120      7189538 :             buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
     121      7189538 :             remain -= this_blk_len;
     122      7189538 :             off += this_blk_len;
     123              :         }
     124              : 
     125      7106080 :         if let Some(dstbuf) = compression {
     126            4 :             if compression_bits == BYTE_ZSTD {
     127            4 :                 let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
     128            4 :                 decoder.write_all(buf_to_write).await?;
     129            4 :                 decoder.flush().await?;
     130              :             } else {
     131            0 :                 unreachable!("already checked above")
     132              :             }
     133      7106076 :         }
     134              : 
     135      7106080 :         Ok(())
     136      7106080 :     }
     137              : }
     138              : 
     139              : /// Reserved bits for length and compression
     140              : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
     141              : 
     142              : /// The maximum size of blobs we support. The highest few bits
     143              : /// are reserved for compression and other further uses.
     144              : const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
     145              : 
     146              : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
     147              : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
     148              : 
     149              : /// A wrapper of `VirtualFile` that allows users to write blobs.
     150              : ///
     151              : /// If a `BlobWriter` is dropped, the internal buffer will be
     152              : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
     153              : /// manually before dropping.
     154              : pub struct BlobWriter<const BUFFERED: bool> {
     155              :     inner: VirtualFile,
     156              :     offset: u64,
     157              :     /// A buffer to save on write calls, only used if BUFFERED=true
     158              :     buf: Vec<u8>,
     159              :     /// We do tiny writes for the length headers; they need to be in an owned buffer;
     160              :     io_buf: Option<BytesMut>,
     161              : }
     162              : 
     163              : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     164         1636 :     pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
     165         1636 :         Self {
     166         1636 :             inner,
     167         1636 :             offset: start_offset,
     168         1636 :             buf: Vec::with_capacity(Self::CAPACITY),
     169         1636 :             io_buf: Some(BytesMut::new()),
     170         1636 :         }
     171         1636 :     }
     172              : 
     173      2025806 :     pub fn size(&self) -> u64 {
     174      2025806 :         self.offset
     175      2025806 :     }
     176              : 
     177              :     const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
     178              : 
     179              :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     180              :     /// You need to make sure that the internal buffer is empty, otherwise
     181              :     /// data will be written in wrong order.
     182              :     #[inline(always)]
     183      1084082 :     async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     184      1084082 :         &mut self,
     185      1084082 :         src_buf: B,
     186      1084082 :         ctx: &RequestContext,
     187      1084082 :     ) -> (B::Buf, Result<(), Error>) {
     188      1084082 :         let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
     189      1084082 :         let nbytes = match res {
     190      1084082 :             Ok(nbytes) => nbytes,
     191            0 :             Err(e) => return (src_buf, Err(e)),
     192              :         };
     193      1084082 :         self.offset += nbytes as u64;
     194      1084082 :         (src_buf, Ok(()))
     195      1084082 :     }
     196              : 
     197              :     #[inline(always)]
     198              :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     199        10708 :     pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
     200        10708 :         let buf = std::mem::take(&mut self.buf);
     201        10708 :         let (mut buf, res) = self.inner.write_all(buf, ctx).await;
     202        10708 :         res?;
     203        10708 :         buf.clear();
     204        10708 :         self.buf = buf;
     205        10708 :         Ok(())
     206        10708 :     }
     207              : 
     208              :     #[inline(always)]
     209              :     /// Writes as much of `src_buf` into the internal buffer as it fits
     210     12943236 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     211     12943236 :         let remaining = Self::CAPACITY - self.buf.len();
     212     12943236 :         let to_copy = src_buf.len().min(remaining);
     213     12943236 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     214     12943236 :         self.offset += to_copy as u64;
     215     12943236 :         to_copy
     216     12943236 :     }
     217              : 
     218              :     /// Internal, possibly buffered, write function
     219     14018060 :     async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     220     14018060 :         &mut self,
     221     14018060 :         src_buf: B,
     222     14018060 :         ctx: &RequestContext,
     223     14018060 :     ) -> (B::Buf, Result<(), Error>) {
     224     14018060 :         if !BUFFERED {
     225      1083852 :             assert!(self.buf.is_empty());
     226      1083852 :             return self.write_all_unbuffered(src_buf, ctx).await;
     227     12934208 :         }
     228     12934208 :         let remaining = Self::CAPACITY - self.buf.len();
     229     12934208 :         let src_buf_len = src_buf.bytes_init();
     230     12934208 :         if src_buf_len == 0 {
     231           24 :             return (Slice::into_inner(src_buf.slice_full()), Ok(()));
     232     12934184 :         }
     233     12934184 :         let mut src_buf = src_buf.slice(0..src_buf_len);
     234     12934184 :         // First try to copy as much as we can into the buffer
     235     12934184 :         if remaining > 0 {
     236     12934184 :             let copied = self.write_into_buffer(&src_buf);
     237     12934184 :             src_buf = src_buf.slice(copied..);
     238     12934184 :         }
     239              :         // Then, if the buffer is full, flush it out
     240     12934184 :         if self.buf.len() == Self::CAPACITY {
     241         9322 :             if let Err(e) = self.flush_buffer(ctx).await {
     242            0 :                 return (Slice::into_inner(src_buf), Err(e));
     243         9322 :             }
     244     12924862 :         }
     245              :         // Finally, write the tail of src_buf:
     246              :         // If it wholly fits into the buffer without
     247              :         // completely filling it, then put it there.
     248              :         // If not, write it out directly.
     249     12934184 :         let src_buf = if !src_buf.is_empty() {
     250         9282 :             assert_eq!(self.buf.len(), 0);
     251         9282 :             if src_buf.len() < Self::CAPACITY {
     252         9052 :                 let copied = self.write_into_buffer(&src_buf);
     253         9052 :                 // We just verified above that src_buf fits into our internal buffer.
     254         9052 :                 assert_eq!(copied, src_buf.len());
     255         9052 :                 Slice::into_inner(src_buf)
     256              :             } else {
     257          230 :                 let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
     258          230 :                 if let Err(e) = res {
     259            0 :                     return (src_buf, Err(e));
     260          230 :                 }
     261          230 :                 src_buf
     262              :             }
     263              :         } else {
     264     12924902 :             Slice::into_inner(src_buf)
     265              :         };
     266     12934184 :         (src_buf, Ok(()))
     267     14018060 :     }
     268              : 
     269              :     /// Write a blob of data. Returns the offset that it was written to,
     270              :     /// which can be used to retrieve the data later.
     271        10348 :     pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     272        10348 :         &mut self,
     273        10348 :         srcbuf: B,
     274        10348 :         ctx: &RequestContext,
     275        10348 :     ) -> (B::Buf, Result<u64, Error>) {
     276        10348 :         self.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
     277         4712 :             .await
     278        10348 :     }
     279              : 
     280              :     /// Write a blob of data. Returns the offset that it was written to,
     281              :     /// which can be used to retrieve the data later.
     282      7009030 :     pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     283      7009030 :         &mut self,
     284      7009030 :         srcbuf: B,
     285      7009030 :         ctx: &RequestContext,
     286      7009030 :         algorithm: ImageCompressionAlgorithm,
     287      7009030 :     ) -> (B::Buf, Result<u64, Error>) {
     288      7009030 :         let offset = self.offset;
     289      7009030 : 
     290      7009030 :         let len = srcbuf.bytes_init();
     291      7009030 : 
     292      7009030 :         let mut io_buf = self.io_buf.take().expect("we always put it back below");
     293      7009030 :         io_buf.clear();
     294      7009030 :         let mut compressed_buf = None;
     295      7009030 :         let ((io_buf, hdr_res), srcbuf) = async {
     296      7009030 :             if len < 128 {
     297      7009030 :                 // Short blob. Write a 1-byte length header
     298      7009030 :                 io_buf.put_u8(len as u8);
     299      6992666 :                 (
     300      6992666 :                     self.write_all(io_buf, ctx).await,
     301      7009030 :                     srcbuf.slice_full().into_inner(),
     302      7009030 :                 )
     303      7009030 :             } else {
     304      7009030 :                 // Write a 4-byte length header
     305      7009030 :                 if len > MAX_SUPPORTED_LEN {
     306      7009030 :                     return (
     307            0 :                         (
     308            0 :                             io_buf,
     309            0 :                             Err(Error::new(
     310            0 :                                 ErrorKind::Other,
     311            0 :                                 format!("blob too large ({len} bytes)"),
     312            0 :                             )),
     313            0 :                         ),
     314            0 :                         srcbuf.slice_full().into_inner(),
     315            0 :                     );
     316      7009030 :                 }
     317      7009030 :                 let (high_bit_mask, len_written, srcbuf) = match algorithm {
     318      7009030 :                     ImageCompressionAlgorithm::Zstd { level } => {
     319      7009030 :                         let mut encoder = if let Some(level) = level {
     320      7009030 :                             async_compression::tokio::write::ZstdEncoder::with_quality(
     321         2050 :                                 Vec::new(),
     322         2050 :                                 Level::Precise(level.into()),
     323         2050 :                             )
     324      7009030 :                         } else {
     325      7009030 :                             async_compression::tokio::write::ZstdEncoder::new(Vec::new())
     326      7009030 :                         };
     327      7009030 :                         let slice = srcbuf.slice_full();
     328         2050 :                         encoder.write_all(&slice[..]).await.unwrap();
     329         2050 :                         encoder.shutdown().await.unwrap();
     330         2050 :                         let compressed = encoder.into_inner();
     331         2050 :                         if compressed.len() < len {
     332      7009030 :                             let compressed_len = compressed.len();
     333            6 :                             compressed_buf = Some(compressed);
     334            6 :                             (BYTE_ZSTD, compressed_len, slice.into_inner())
     335      7009030 :                         } else {
     336      7009030 :                             (BYTE_UNCOMPRESSED, len, slice.into_inner())
     337      7009030 :                         }
     338      7009030 :                     }
     339      7009030 :                     ImageCompressionAlgorithm::Disabled => {
     340      7009030 :                         (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
     341      7009030 :                     }
     342      7009030 :                 };
     343      7009030 :                 let mut len_buf = (len_written as u32).to_be_bytes();
     344        16364 :                 assert_eq!(len_buf[0] & 0xf0, 0);
     345      7009030 :                 len_buf[0] |= high_bit_mask;
     346        16364 :                 io_buf.extend_from_slice(&len_buf[..]);
     347        16364 :                 (self.write_all(io_buf, ctx).await, srcbuf)
     348      7009030 :             }
     349      7009030 :         }
     350       276099 :         .await;
     351      7009030 :         self.io_buf = Some(io_buf);
     352      7009030 :         match hdr_res {
     353      7009030 :             Ok(_) => (),
     354            0 :             Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
     355              :         }
     356      7009030 :         let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
     357            6 :             let (_buf, res) = self.write_all(compressed_buf, ctx).await;
     358            6 :             (Slice::into_inner(srcbuf.slice(..)), res)
     359              :         } else {
     360      7009024 :             self.write_all(srcbuf, ctx).await
     361              :         };
     362      7009030 :         (srcbuf, res.map(|_| offset))
     363      7009030 :     }
     364              : }
     365              : 
     366              : impl BlobWriter<true> {
     367              :     /// Access the underlying `VirtualFile`.
     368              :     ///
     369              :     /// This function flushes the internal buffer before giving access
     370              :     /// to the underlying `VirtualFile`.
     371         1346 :     pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
     372         1346 :         self.flush_buffer(ctx).await?;
     373         1346 :         Ok(self.inner)
     374         1346 :     }
     375              : 
     376              :     /// Access the underlying `VirtualFile`.
     377              :     ///
     378              :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     379              :     /// the internal buffer before giving access.
     380            0 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     381            0 :         self.inner
     382            0 :     }
     383              : }
     384              : 
     385              : impl BlobWriter<false> {
     386              :     /// Access the underlying `VirtualFile`.
     387          250 :     pub fn into_inner(self) -> VirtualFile {
     388          250 :         self.inner
     389          250 :     }
     390              : }
     391              : 
     392              : #[cfg(test)]
     393              : pub(crate) mod tests {
     394              :     use super::*;
     395              :     use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
     396              :     use camino::Utf8PathBuf;
     397              :     use camino_tempfile::Utf8TempDir;
     398              :     use rand::{Rng, SeedableRng};
     399              : 
     400           24 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     401        15054 :         round_trip_test_compressed::<BUFFERED>(blobs, false).await
     402           24 :     }
     403              : 
     404           40 :     pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
     405           40 :         blobs: &[Vec<u8>],
     406           40 :         compression: bool,
     407           40 :         ctx: &RequestContext,
     408           40 :     ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
     409           40 :         let temp_dir = camino_tempfile::tempdir()?;
     410           40 :         let pathbuf = temp_dir.path().join("file");
     411           40 : 
     412           40 :         // Write part (in block to drop the file)
     413           40 :         let mut offsets = Vec::new();
     414              :         {
     415           40 :             let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
     416           40 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
     417        12408 :             for blob in blobs.iter() {
     418        12408 :                 let (_, res) = if compression {
     419         2100 :                     wtr.write_blob_maybe_compressed(
     420         2100 :                         blob.clone(),
     421         2100 :                         ctx,
     422         2100 :                         ImageCompressionAlgorithm::Zstd { level: Some(1) },
     423         2100 :                     )
     424          154 :                     .await
     425              :                 } else {
     426        10308 :                     wtr.write_blob(blob.clone(), ctx).await
     427              :                 };
     428        12408 :                 let offs = res?;
     429        12408 :                 offsets.push(offs);
     430              :             }
     431              :             // Write out one page worth of zeros so that we can
     432              :             // read again with read_blk
     433           40 :             let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await;
     434           40 :             let offs = res?;
     435           40 :             println!("Writing final blob at offs={offs}");
     436           40 :             wtr.flush_buffer(ctx).await?;
     437              :         }
     438           40 :         Ok((temp_dir, pathbuf, offsets))
     439           40 :     }
     440              : 
     441           32 :     async fn round_trip_test_compressed<const BUFFERED: bool>(
     442           32 :         blobs: &[Vec<u8>],
     443           32 :         compression: bool,
     444           32 :     ) -> Result<(), Error> {
     445           32 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     446           32 :         let (_temp_dir, pathbuf, offsets) =
     447         4630 :             write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
     448              : 
     449           32 :         let file = VirtualFile::open(pathbuf, &ctx).await?;
     450           32 :         let rdr = BlockReaderRef::VirtualFile(&file);
     451           32 :         let rdr = BlockCursor::new_with_compression(rdr, compression);
     452         8288 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     453        10619 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     454         8288 :             assert_eq!(
     455         8288 :                 blob, &blob_read,
     456            0 :                 "mismatch for idx={idx} at offset={offset}"
     457              :             );
     458              :         }
     459           32 :         Ok(())
     460           32 :     }
     461              : 
     462         6158 :     pub(crate) fn random_array(len: usize) -> Vec<u8> {
     463         6158 :         let mut rng = rand::thread_rng();
     464     68151904 :         (0..len).map(|_| rng.gen()).collect::<_>()
     465         6158 :     }
     466              : 
     467              :     #[tokio::test]
     468            2 :     async fn test_one() -> Result<(), Error> {
     469            2 :         let blobs = &[vec![12, 21, 22]];
     470            8 :         round_trip_test::<false>(blobs).await?;
     471            4 :         round_trip_test::<true>(blobs).await?;
     472            2 :         Ok(())
     473            2 :     }
     474              : 
     475              :     #[tokio::test]
     476            2 :     async fn test_hello_simple() -> Result<(), Error> {
     477            2 :         let blobs = &[
     478            2 :             vec![0, 1, 2, 3],
     479            2 :             b"Hello, World!".to_vec(),
     480            2 :             Vec::new(),
     481            2 :             b"foobar".to_vec(),
     482            2 :         ];
     483           16 :         round_trip_test::<false>(blobs).await?;
     484            7 :         round_trip_test::<true>(blobs).await?;
     485           15 :         round_trip_test_compressed::<false>(blobs, true).await?;
     486            7 :         round_trip_test_compressed::<true>(blobs, true).await?;
     487            2 :         Ok(())
     488            2 :     }
     489              : 
     490              :     #[tokio::test]
     491            2 :     async fn test_really_big_array() -> Result<(), Error> {
     492            2 :         let blobs = &[
     493            2 :             b"test".to_vec(),
     494            2 :             random_array(10 * PAGE_SZ),
     495            2 :             b"hello".to_vec(),
     496            2 :             random_array(66 * PAGE_SZ),
     497            2 :             vec![0xf3; 24 * PAGE_SZ],
     498            2 :             b"foobar".to_vec(),
     499            2 :         ];
     500          124 :         round_trip_test::<false>(blobs).await?;
     501          116 :         round_trip_test::<true>(blobs).await?;
     502          100 :         round_trip_test_compressed::<false>(blobs, true).await?;
     503           89 :         round_trip_test_compressed::<true>(blobs, true).await?;
     504            2 :         Ok(())
     505            2 :     }
     506              : 
     507              :     #[tokio::test]
     508            2 :     async fn test_arrays_inc() -> Result<(), Error> {
     509            2 :         let blobs = (0..PAGE_SZ / 8)
     510         2048 :             .map(|v| random_array(v * 16))
     511            2 :             .collect::<Vec<_>>();
     512         4162 :         round_trip_test::<false>(&blobs).await?;
     513         2212 :         round_trip_test::<true>(&blobs).await?;
     514            2 :         Ok(())
     515            2 :     }
     516              : 
     517              :     #[tokio::test]
     518            2 :     async fn test_arrays_random_size() -> Result<(), Error> {
     519            2 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     520            2 :         let blobs = (0..1024)
     521         2048 :             .map(|_| {
     522         2048 :                 let mut sz: u16 = rng.gen();
     523         2048 :                 // Make 50% of the arrays small
     524         2048 :                 if rng.gen() {
     525         1032 :                     sz &= 63;
     526         1032 :                 }
     527         2048 :                 random_array(sz.into())
     528         2048 :             })
     529            2 :             .collect::<Vec<_>>();
     530         5106 :         round_trip_test::<false>(&blobs).await?;
     531         3279 :         round_trip_test::<true>(&blobs).await?;
     532            2 :         Ok(())
     533            2 :     }
     534              : 
     535              :     #[tokio::test]
     536            2 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     537            2 :         let blobs = &[
     538            2 :             random_array(PAGE_SZ - 4),
     539            2 :             random_array(PAGE_SZ - 4),
     540            2 :             random_array(PAGE_SZ - 4),
     541            2 :         ];
     542           14 :         round_trip_test::<false>(blobs).await?;
     543            6 :         round_trip_test::<true>(blobs).await?;
     544            2 :         Ok(())
     545            2 :     }
     546              : }
        

Generated by: LCOV version 2.1-beta