LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 93.1 % 375 349
Test Date: 2024-08-02 21:34:27 Functions: 84.5 % 97 82

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte. For blobs larger than 128 bits,
      10              : //! we also specify three reserved bits, only one of the three bit
      11              : //! patterns is currently in use (0b011) and signifies compression
      12              : //! with zstd.
      13              : //!
      14              : //! len <  128: 0XXXXXXX
      15              : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      16              : //!
      17              : use async_compression::Level;
      18              : use bytes::{BufMut, BytesMut};
      19              : use pageserver_api::models::ImageCompressionAlgorithm;
      20              : use tokio::io::AsyncWriteExt;
      21              : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
      22              : use tracing::warn;
      23              : 
      24              : use crate::context::RequestContext;
      25              : use crate::page_cache::PAGE_SZ;
      26              : use crate::tenant::block_io::BlockCursor;
      27              : use crate::virtual_file::VirtualFile;
      28              : use std::cmp::min;
      29              : use std::io::{Error, ErrorKind};
      30              : 
      31              : #[derive(Copy, Clone, Debug)]
      32              : pub struct CompressionInfo {
      33              :     pub written_compressed: bool,
      34              :     pub compressed_size: Option<usize>,
      35              : }
      36              : 
      37              : impl<'a> BlockCursor<'a> {
      38              :     /// Read a blob into a new buffer.
      39      2591663 :     pub async fn read_blob(
      40      2591663 :         &self,
      41      2591663 :         offset: u64,
      42      2591663 :         ctx: &RequestContext,
      43      2591663 :     ) -> Result<Vec<u8>, std::io::Error> {
      44      2591663 :         let mut buf = Vec::new();
      45      2591663 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
      46      2591663 :         Ok(buf)
      47      2591663 :     }
      48              :     /// Read blob into the given buffer. Any previous contents in the buffer
      49              :     /// are overwritten.
      50      6977565 :     pub async fn read_blob_into_buf(
      51      6977565 :         &self,
      52      6977565 :         offset: u64,
      53      6977565 :         dstbuf: &mut Vec<u8>,
      54      6977565 :         ctx: &RequestContext,
      55      6977565 :     ) -> Result<(), std::io::Error> {
      56      6977565 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      57      6977565 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      58              : 
      59      6977565 :         let mut buf = self.read_blk(blknum, ctx).await?;
      60              : 
      61              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      62      6977565 :         let first_len_byte = buf[off];
      63      6977565 :         let len: usize = if first_len_byte < 0x80 {
      64              :             // 1-byte length header
      65      6962745 :             off += 1;
      66      6962745 :             first_len_byte as usize
      67              :         } else {
      68              :             // 4-byte length header
      69        14820 :             let mut len_buf = [0u8; 4];
      70        14820 :             let thislen = PAGE_SZ - off;
      71        14820 :             if thislen < 4 {
      72              :                 // it is split across two pages
      73            1 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      74            1 :                 blknum += 1;
      75            1 :                 buf = self.read_blk(blknum, ctx).await?;
      76            1 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      77            1 :                 off = 4 - thislen;
      78        14819 :             } else {
      79        14819 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      80        14819 :                 off += 4;
      81        14819 :             }
      82        14820 :             let bit_mask = if self.read_compressed {
      83           20 :                 !LEN_COMPRESSION_BIT_MASK
      84              :             } else {
      85        14800 :                 0x7f
      86              :             };
      87        14820 :             len_buf[0] &= bit_mask;
      88        14820 :             u32::from_be_bytes(len_buf) as usize
      89              :         };
      90      6977565 :         let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
      91      6977565 : 
      92      6977565 :         let mut tmp_buf = Vec::new();
      93              :         let buf_to_write;
      94      6977565 :         let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
      95      6977561 :             if compression_bits > BYTE_UNCOMPRESSED {
      96            0 :                 warn!("reading key above future limit ({len} bytes)");
      97      6977561 :             }
      98      6977561 :             buf_to_write = dstbuf;
      99      6977561 :             None
     100            4 :         } else if compression_bits == BYTE_ZSTD {
     101            4 :             buf_to_write = &mut tmp_buf;
     102            4 :             Some(dstbuf)
     103              :         } else {
     104            0 :             let error = std::io::Error::new(
     105            0 :                 std::io::ErrorKind::InvalidData,
     106            0 :                 format!("invalid compression byte {compression_bits:x}"),
     107            0 :             );
     108            0 :             return Err(error);
     109              :         };
     110              : 
     111      6977565 :         buf_to_write.clear();
     112      6977565 :         buf_to_write.reserve(len);
     113      6977565 : 
     114      6977565 :         // Read the payload
     115      6977565 :         let mut remain = len;
     116     14037326 :         while remain > 0 {
     117      7059761 :             let mut page_remain = PAGE_SZ - off;
     118      7059761 :             if page_remain == 0 {
     119              :                 // continue on next page
     120        82884 :                 blknum += 1;
     121        82884 :                 buf = self.read_blk(blknum, ctx).await?;
     122        82884 :                 off = 0;
     123        82884 :                 page_remain = PAGE_SZ;
     124      6976877 :             }
     125      7059761 :             let this_blk_len = min(remain, page_remain);
     126      7059761 :             buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
     127      7059761 :             remain -= this_blk_len;
     128      7059761 :             off += this_blk_len;
     129              :         }
     130              : 
     131      6977565 :         if let Some(dstbuf) = compression {
     132            4 :             if compression_bits == BYTE_ZSTD {
     133            4 :                 let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
     134            4 :                 decoder.write_all(buf_to_write).await?;
     135            4 :                 decoder.flush().await?;
     136              :             } else {
     137            0 :                 unreachable!("already checked above")
     138              :             }
     139      6977561 :         }
     140              : 
     141      6977565 :         Ok(())
     142      6977565 :     }
     143              : }
     144              : 
     145              : /// Reserved bits for length and compression
     146              : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
     147              : 
     148              : /// The maximum size of blobs we support. The highest few bits
     149              : /// are reserved for compression and other further uses.
     150              : const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
     151              : 
     152              : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
     153              : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
     154              : 
     155              : /// A wrapper of `VirtualFile` that allows users to write blobs.
     156              : ///
     157              : /// If a `BlobWriter` is dropped, the internal buffer will be
     158              : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
     159              : /// manually before dropping.
     160              : pub struct BlobWriter<const BUFFERED: bool> {
     161              :     inner: VirtualFile,
     162              :     offset: u64,
     163              :     /// A buffer to save on write calls, only used if BUFFERED=true
     164              :     buf: Vec<u8>,
     165              :     /// We do tiny writes for the length headers; they need to be in an owned buffer;
     166              :     io_buf: Option<BytesMut>,
     167              : }
     168              : 
     169              : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     170         1690 :     pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
     171         1690 :         Self {
     172         1690 :             inner,
     173         1690 :             offset: start_offset,
     174         1690 :             buf: Vec::with_capacity(Self::CAPACITY),
     175         1690 :             io_buf: Some(BytesMut::new()),
     176         1690 :         }
     177         1690 :     }
     178              : 
     179      2025870 :     pub fn size(&self) -> u64 {
     180      2025870 :         self.offset
     181      2025870 :     }
     182              : 
     183              :     const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
     184              : 
     185              :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     186              :     /// You need to make sure that the internal buffer is empty, otherwise
     187              :     /// data will be written in wrong order.
     188              :     #[inline(always)]
     189      1084690 :     async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     190      1084690 :         &mut self,
     191      1084690 :         src_buf: B,
     192      1084690 :         ctx: &RequestContext,
     193      1084690 :     ) -> (B::Buf, Result<(), Error>) {
     194      1084690 :         let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
     195      1084690 :         let nbytes = match res {
     196      1084690 :             Ok(nbytes) => nbytes,
     197            0 :             Err(e) => return (src_buf, Err(e)),
     198              :         };
     199      1084690 :         self.offset += nbytes as u64;
     200      1084690 :         (src_buf, Ok(()))
     201      1084690 :     }
     202              : 
     203              :     #[inline(always)]
     204              :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     205        10740 :     pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
     206        10740 :         let buf = std::mem::take(&mut self.buf);
     207        10740 :         let (mut buf, res) = self.inner.write_all(buf, ctx).await;
     208        10740 :         res?;
     209        10740 :         buf.clear();
     210        10740 :         self.buf = buf;
     211        10740 :         Ok(())
     212        10740 :     }
     213              : 
     214              :     #[inline(always)]
     215              :     /// Writes as much of `src_buf` into the internal buffer as it fits
     216     12943388 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     217     12943388 :         let remaining = Self::CAPACITY - self.buf.len();
     218     12943388 :         let to_copy = src_buf.len().min(remaining);
     219     12943388 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     220     12943388 :         self.offset += to_copy as u64;
     221     12943388 :         to_copy
     222     12943388 :     }
     223              : 
     224              :     /// Internal, possibly buffered, write function
     225     14018820 :     async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     226     14018820 :         &mut self,
     227     14018820 :         src_buf: B,
     228     14018820 :         ctx: &RequestContext,
     229     14018820 :     ) -> (B::Buf, Result<(), Error>) {
     230     14018820 :         if !BUFFERED {
     231      1084460 :             assert!(self.buf.is_empty());
     232      1084460 :             return self.write_all_unbuffered(src_buf, ctx).await;
     233     12934360 :         }
     234     12934360 :         let remaining = Self::CAPACITY - self.buf.len();
     235     12934360 :         let src_buf_len = src_buf.bytes_init();
     236     12934360 :         if src_buf_len == 0 {
     237           24 :             return (Slice::into_inner(src_buf.slice_full()), Ok(()));
     238     12934336 :         }
     239     12934336 :         let mut src_buf = src_buf.slice(0..src_buf_len);
     240     12934336 :         // First try to copy as much as we can into the buffer
     241     12934336 :         if remaining > 0 {
     242     12934336 :             let copied = self.write_into_buffer(&src_buf);
     243     12934336 :             src_buf = src_buf.slice(copied..);
     244     12934336 :         }
     245              :         // Then, if the buffer is full, flush it out
     246     12934336 :         if self.buf.len() == Self::CAPACITY {
     247         9322 :             if let Err(e) = self.flush_buffer(ctx).await {
     248            0 :                 return (Slice::into_inner(src_buf), Err(e));
     249         9322 :             }
     250     12925014 :         }
     251              :         // Finally, write the tail of src_buf:
     252              :         // If it wholly fits into the buffer without
     253              :         // completely filling it, then put it there.
     254              :         // If not, write it out directly.
     255     12934336 :         let src_buf = if !src_buf.is_empty() {
     256         9282 :             assert_eq!(self.buf.len(), 0);
     257         9282 :             if src_buf.len() < Self::CAPACITY {
     258         9052 :                 let copied = self.write_into_buffer(&src_buf);
     259         9052 :                 // We just verified above that src_buf fits into our internal buffer.
     260         9052 :                 assert_eq!(copied, src_buf.len());
     261         9052 :                 Slice::into_inner(src_buf)
     262              :             } else {
     263          230 :                 let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
     264          230 :                 if let Err(e) = res {
     265            0 :                     return (src_buf, Err(e));
     266          230 :                 }
     267          230 :                 src_buf
     268              :             }
     269              :         } else {
     270     12925054 :             Slice::into_inner(src_buf)
     271              :         };
     272     12934336 :         (src_buf, Ok(()))
     273     14018820 :     }
     274              : 
     275              :     /// Write a blob of data. Returns the offset that it was written to,
     276              :     /// which can be used to retrieve the data later.
     277        10348 :     pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     278        10348 :         &mut self,
     279        10348 :         srcbuf: B,
     280        10348 :         ctx: &RequestContext,
     281        10348 :     ) -> (B::Buf, Result<u64, Error>) {
     282        10348 :         let (buf, res) = self
     283        10348 :             .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
     284         4712 :             .await;
     285        10348 :         (buf, res.map(|(off, _compression_info)| off))
     286        10348 :     }
     287              : 
     288              :     /// Write a blob of data. Returns the offset that it was written to,
     289              :     /// which can be used to retrieve the data later.
     290      7009410 :     pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     291      7009410 :         &mut self,
     292      7009410 :         srcbuf: B,
     293      7009410 :         ctx: &RequestContext,
     294      7009410 :         algorithm: ImageCompressionAlgorithm,
     295      7009410 :     ) -> (B::Buf, Result<(u64, CompressionInfo), Error>) {
     296      7009410 :         let offset = self.offset;
     297      7009410 :         let mut compression_info = CompressionInfo {
     298      7009410 :             written_compressed: false,
     299      7009410 :             compressed_size: None,
     300      7009410 :         };
     301      7009410 : 
     302      7009410 :         let len = srcbuf.bytes_init();
     303      7009410 : 
     304      7009410 :         let mut io_buf = self.io_buf.take().expect("we always put it back below");
     305      7009410 :         io_buf.clear();
     306      7009410 :         let mut compressed_buf = None;
     307      7009410 :         let ((io_buf, hdr_res), srcbuf) = async {
     308      7009410 :             if len < 128 {
     309              :                 // Short blob. Write a 1-byte length header
     310      6993046 :                 io_buf.put_u8(len as u8);
     311      6993046 :                 (
     312      6993046 :                     self.write_all(io_buf, ctx).await,
     313      6993046 :                     srcbuf.slice_full().into_inner(),
     314              :                 )
     315              :             } else {
     316              :                 // Write a 4-byte length header
     317        16364 :                 if len > MAX_SUPPORTED_LEN {
     318            0 :                     return (
     319            0 :                         (
     320            0 :                             io_buf,
     321            0 :                             Err(Error::new(
     322            0 :                                 ErrorKind::Other,
     323            0 :                                 format!("blob too large ({len} bytes)"),
     324            0 :                             )),
     325            0 :                         ),
     326            0 :                         srcbuf.slice_full().into_inner(),
     327            0 :                     );
     328        16364 :                 }
     329        16364 :                 let (high_bit_mask, len_written, srcbuf) = match algorithm {
     330         2050 :                     ImageCompressionAlgorithm::Zstd { level } => {
     331         2050 :                         let mut encoder = if let Some(level) = level {
     332         2050 :                             async_compression::tokio::write::ZstdEncoder::with_quality(
     333         2050 :                                 Vec::new(),
     334         2050 :                                 Level::Precise(level.into()),
     335         2050 :                             )
     336              :                         } else {
     337            0 :                             async_compression::tokio::write::ZstdEncoder::new(Vec::new())
     338              :                         };
     339         2050 :                         let slice = srcbuf.slice_full();
     340         2050 :                         encoder.write_all(&slice[..]).await.unwrap();
     341         2050 :                         encoder.shutdown().await.unwrap();
     342         2050 :                         let compressed = encoder.into_inner();
     343         2050 :                         compression_info.compressed_size = Some(compressed.len());
     344         2050 :                         if compressed.len() < len {
     345            6 :                             compression_info.written_compressed = true;
     346            6 :                             let compressed_len = compressed.len();
     347            6 :                             compressed_buf = Some(compressed);
     348            6 :                             (BYTE_ZSTD, compressed_len, slice.into_inner())
     349              :                         } else {
     350         2044 :                             (BYTE_UNCOMPRESSED, len, slice.into_inner())
     351              :                         }
     352              :                     }
     353              :                     ImageCompressionAlgorithm::Disabled => {
     354        14314 :                         (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
     355              :                     }
     356              :                 };
     357        16364 :                 let mut len_buf = (len_written as u32).to_be_bytes();
     358        16364 :                 assert_eq!(len_buf[0] & 0xf0, 0);
     359        16364 :                 len_buf[0] |= high_bit_mask;
     360        16364 :                 io_buf.extend_from_slice(&len_buf[..]);
     361        16364 :                 (self.write_all(io_buf, ctx).await, srcbuf)
     362              :             }
     363      7009410 :         }
     364       276233 :         .await;
     365      7009410 :         self.io_buf = Some(io_buf);
     366      7009410 :         match hdr_res {
     367      7009410 :             Ok(_) => (),
     368            0 :             Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
     369              :         }
     370      7009410 :         let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
     371            6 :             let (_buf, res) = self.write_all(compressed_buf, ctx).await;
     372            6 :             (Slice::into_inner(srcbuf.slice(..)), res)
     373              :         } else {
     374      7009404 :             self.write_all(srcbuf, ctx).await
     375              :         };
     376      7009410 :         (srcbuf, res.map(|_| (offset, compression_info)))
     377      7009410 :     }
     378              : }
     379              : 
     380              : impl BlobWriter<true> {
     381              :     /// Access the underlying `VirtualFile`.
     382              :     ///
     383              :     /// This function flushes the internal buffer before giving access
     384              :     /// to the underlying `VirtualFile`.
     385         1378 :     pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
     386         1378 :         self.flush_buffer(ctx).await?;
     387         1378 :         Ok(self.inner)
     388         1378 :     }
     389              : 
     390              :     /// Access the underlying `VirtualFile`.
     391              :     ///
     392              :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     393              :     /// the internal buffer before giving access.
     394            0 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     395            0 :         self.inner
     396            0 :     }
     397              : }
     398              : 
     399              : impl BlobWriter<false> {
     400              :     /// Access the underlying `VirtualFile`.
     401          272 :     pub fn into_inner(self) -> VirtualFile {
     402          272 :         self.inner
     403          272 :     }
     404              : }
     405              : 
     406              : #[cfg(test)]
     407              : pub(crate) mod tests {
     408              :     use super::*;
     409              :     use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
     410              :     use camino::Utf8PathBuf;
     411              :     use camino_tempfile::Utf8TempDir;
     412              :     use rand::{Rng, SeedableRng};
     413              : 
     414           24 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     415        15054 :         round_trip_test_compressed::<BUFFERED>(blobs, false).await
     416           24 :     }
     417              : 
     418           40 :     pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
     419           40 :         blobs: &[Vec<u8>],
     420           40 :         compression: bool,
     421           40 :         ctx: &RequestContext,
     422           40 :     ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
     423           40 :         let temp_dir = camino_tempfile::tempdir()?;
     424           40 :         let pathbuf = temp_dir.path().join("file");
     425           40 : 
     426           40 :         // Write part (in block to drop the file)
     427           40 :         let mut offsets = Vec::new();
     428              :         {
     429           40 :             let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
     430           40 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
     431        12408 :             for blob in blobs.iter() {
     432        12408 :                 let (_, res) = if compression {
     433         2100 :                     let res = wtr
     434         2100 :                         .write_blob_maybe_compressed(
     435         2100 :                             blob.clone(),
     436         2100 :                             ctx,
     437         2100 :                             ImageCompressionAlgorithm::Zstd { level: Some(1) },
     438         2100 :                         )
     439          154 :                         .await;
     440         2100 :                     (res.0, res.1.map(|(off, _)| off))
     441              :                 } else {
     442        10308 :                     wtr.write_blob(blob.clone(), ctx).await
     443              :                 };
     444        12408 :                 let offs = res?;
     445        12408 :                 offsets.push(offs);
     446              :             }
     447              :             // Write out one page worth of zeros so that we can
     448              :             // read again with read_blk
     449           40 :             let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await;
     450           40 :             let offs = res?;
     451           40 :             println!("Writing final blob at offs={offs}");
     452           40 :             wtr.flush_buffer(ctx).await?;
     453              :         }
     454           40 :         Ok((temp_dir, pathbuf, offsets))
     455           40 :     }
     456              : 
     457           32 :     async fn round_trip_test_compressed<const BUFFERED: bool>(
     458           32 :         blobs: &[Vec<u8>],
     459           32 :         compression: bool,
     460           32 :     ) -> Result<(), Error> {
     461           32 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     462           32 :         let (_temp_dir, pathbuf, offsets) =
     463         4630 :             write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
     464              : 
     465           32 :         let file = VirtualFile::open(pathbuf, &ctx).await?;
     466           32 :         let rdr = BlockReaderRef::VirtualFile(&file);
     467           32 :         let rdr = BlockCursor::new_with_compression(rdr, compression);
     468         8288 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     469        10619 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     470         8288 :             assert_eq!(
     471         8288 :                 blob, &blob_read,
     472            0 :                 "mismatch for idx={idx} at offset={offset}"
     473              :             );
     474              :         }
     475           32 :         Ok(())
     476           32 :     }
     477              : 
     478         6158 :     pub(crate) fn random_array(len: usize) -> Vec<u8> {
     479         6158 :         let mut rng = rand::thread_rng();
     480     68151904 :         (0..len).map(|_| rng.gen()).collect::<_>()
     481         6158 :     }
     482              : 
     483              :     #[tokio::test]
     484            2 :     async fn test_one() -> Result<(), Error> {
     485            2 :         let blobs = &[vec![12, 21, 22]];
     486            8 :         round_trip_test::<false>(blobs).await?;
     487            4 :         round_trip_test::<true>(blobs).await?;
     488            2 :         Ok(())
     489            2 :     }
     490              : 
     491              :     #[tokio::test]
     492            2 :     async fn test_hello_simple() -> Result<(), Error> {
     493            2 :         let blobs = &[
     494            2 :             vec![0, 1, 2, 3],
     495            2 :             b"Hello, World!".to_vec(),
     496            2 :             Vec::new(),
     497            2 :             b"foobar".to_vec(),
     498            2 :         ];
     499           16 :         round_trip_test::<false>(blobs).await?;
     500            7 :         round_trip_test::<true>(blobs).await?;
     501           15 :         round_trip_test_compressed::<false>(blobs, true).await?;
     502            7 :         round_trip_test_compressed::<true>(blobs, true).await?;
     503            2 :         Ok(())
     504            2 :     }
     505              : 
     506              :     #[tokio::test]
     507            2 :     async fn test_really_big_array() -> Result<(), Error> {
     508            2 :         let blobs = &[
     509            2 :             b"test".to_vec(),
     510            2 :             random_array(10 * PAGE_SZ),
     511            2 :             b"hello".to_vec(),
     512            2 :             random_array(66 * PAGE_SZ),
     513            2 :             vec![0xf3; 24 * PAGE_SZ],
     514            2 :             b"foobar".to_vec(),
     515            2 :         ];
     516          124 :         round_trip_test::<false>(blobs).await?;
     517          116 :         round_trip_test::<true>(blobs).await?;
     518          100 :         round_trip_test_compressed::<false>(blobs, true).await?;
     519           89 :         round_trip_test_compressed::<true>(blobs, true).await?;
     520            2 :         Ok(())
     521            2 :     }
     522              : 
     523              :     #[tokio::test]
     524            2 :     async fn test_arrays_inc() -> Result<(), Error> {
     525            2 :         let blobs = (0..PAGE_SZ / 8)
     526         2048 :             .map(|v| random_array(v * 16))
     527            2 :             .collect::<Vec<_>>();
     528         4162 :         round_trip_test::<false>(&blobs).await?;
     529         2212 :         round_trip_test::<true>(&blobs).await?;
     530            2 :         Ok(())
     531            2 :     }
     532              : 
     533              :     #[tokio::test]
     534            2 :     async fn test_arrays_random_size() -> Result<(), Error> {
     535            2 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     536            2 :         let blobs = (0..1024)
     537         2048 :             .map(|_| {
     538         2048 :                 let mut sz: u16 = rng.gen();
     539         2048 :                 // Make 50% of the arrays small
     540         2048 :                 if rng.gen() {
     541         1032 :                     sz &= 63;
     542         1032 :                 }
     543         2048 :                 random_array(sz.into())
     544         2048 :             })
     545            2 :             .collect::<Vec<_>>();
     546         5106 :         round_trip_test::<false>(&blobs).await?;
     547         3279 :         round_trip_test::<true>(&blobs).await?;
     548            2 :         Ok(())
     549            2 :     }
     550              : 
     551              :     #[tokio::test]
     552            2 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     553            2 :         let blobs = &[
     554            2 :             random_array(PAGE_SZ - 4),
     555            2 :             random_array(PAGE_SZ - 4),
     556            2 :             random_array(PAGE_SZ - 4),
     557            2 :         ];
     558           14 :         round_trip_test::<false>(blobs).await?;
     559            6 :         round_trip_test::<true>(blobs).await?;
     560            2 :         Ok(())
     561            2 :     }
     562              : }
        

Generated by: LCOV version 2.1-beta