LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: 53536e7d038dd1afd98124ffab7571882048d4d5.info Lines: 90.4 % 447 404
Test Date: 2025-04-24 12:00:37 Functions: 90.8 % 109 99

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte. For blobs larger than 128 bits,
      10              : //! we also specify three reserved bits, only one of the three bit
      11              : //! patterns is currently in use (0b011) and signifies compression
      12              : //! with zstd.
      13              : //!
      14              : //! len <  128: 0XXXXXXX
      15              : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      16              : //!
      17              : use std::cmp::min;
      18              : use std::io::Error;
      19              : 
      20              : use async_compression::Level;
      21              : use bytes::{BufMut, BytesMut};
      22              : use pageserver_api::models::ImageCompressionAlgorithm;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::warn;
      27              : 
      28              : use crate::context::RequestContext;
      29              : use crate::page_cache::PAGE_SZ;
      30              : use crate::tenant::block_io::BlockCursor;
      31              : use crate::virtual_file::VirtualFile;
      32              : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
      33              : 
      34              : #[derive(Copy, Clone, Debug)]
      35              : pub struct CompressionInfo {
      36              :     pub written_compressed: bool,
      37              :     pub compressed_size: Option<usize>,
      38              : }
      39              : 
      40              : /// A blob header, with header+data length and compression info.
      41              : ///
      42              : /// TODO: use this more widely, and add an encode() method too.
      43              : /// TODO: document the header format.
      44              : #[derive(Clone, Copy, Default)]
      45              : pub struct Header {
      46              :     pub header_len: usize,
      47              :     pub data_len: usize,
      48              :     pub compression_bits: u8,
      49              : }
      50              : 
      51              : impl Header {
      52              :     /// Decodes a header from a byte slice.
      53     23347181 :     pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
      54     23347181 :         let Some(&first_header_byte) = bytes.first() else {
      55            0 :             return Err(std::io::Error::new(
      56            0 :                 std::io::ErrorKind::InvalidData,
      57            0 :                 "zero-length blob header",
      58            0 :             ));
      59              :         };
      60              : 
      61              :         // If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
      62     23347181 :         if first_header_byte < 0x80 {
      63     23096597 :             return Ok(Self {
      64     23096597 :                 header_len: 1, // by definition
      65     23096597 :                 data_len: first_header_byte as usize,
      66     23096597 :                 compression_bits: BYTE_UNCOMPRESSED,
      67     23096597 :             });
      68       250584 :         }
      69              : 
      70              :         // Otherwise, this is a 4-byte header containing compression information and length.
      71              :         const HEADER_LEN: usize = 4;
      72       250584 :         let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| {
      73            0 :             std::io::Error::new(
      74            0 :                 std::io::ErrorKind::InvalidData,
      75            0 :                 format!("blob header too short: {bytes:?}"),
      76            0 :             )
      77       250584 :         })?;
      78              : 
      79              :         // TODO: verify the compression bits and convert to an enum.
      80       250584 :         let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
      81       250584 :         header_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
      82       250584 :         let data_len = u32::from_be_bytes(header_buf) as usize;
      83       250584 : 
      84       250584 :         Ok(Self {
      85       250584 :             header_len: HEADER_LEN,
      86       250584 :             data_len,
      87       250584 :             compression_bits,
      88       250584 :         })
      89     23347181 :     }
      90              : 
      91              :     /// Returns the total header+data length.
      92       122976 :     pub fn total_len(&self) -> usize {
      93       122976 :         self.header_len + self.data_len
      94       122976 :     }
      95              : }
      96              : 
      97              : impl BlockCursor<'_> {
      98              :     /// Read a blob into a new buffer.
      99        49776 :     pub async fn read_blob(
     100        49776 :         &self,
     101        49776 :         offset: u64,
     102        49776 :         ctx: &RequestContext,
     103        49776 :     ) -> Result<Vec<u8>, std::io::Error> {
     104        49776 :         let mut buf = Vec::new();
     105        49776 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
     106        49776 :         Ok(buf)
     107        49776 :     }
     108              :     /// Read blob into the given buffer. Any previous contents in the buffer
     109              :     /// are overwritten.
     110        50160 :     pub async fn read_blob_into_buf(
     111        50160 :         &self,
     112        50160 :         offset: u64,
     113        50160 :         dstbuf: &mut Vec<u8>,
     114        50160 :         ctx: &RequestContext,
     115        50160 :     ) -> Result<(), std::io::Error> {
     116        50160 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
     117        50160 :         let mut off = (offset % PAGE_SZ as u64) as usize;
     118              : 
     119        50160 :         let mut buf = self.read_blk(blknum, ctx).await?;
     120              : 
     121              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
     122        50160 :         let first_len_byte = buf[off];
     123        50160 :         let len: usize = if first_len_byte < 0x80 {
     124              :             // 1-byte length header
     125        13344 :             off += 1;
     126        13344 :             first_len_byte as usize
     127              :         } else {
     128              :             // 4-byte length header
     129        36816 :             let mut len_buf = [0u8; 4];
     130        36816 :             let thislen = PAGE_SZ - off;
     131        36816 :             if thislen < 4 {
     132              :                 // it is split across two pages
     133            0 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
     134            0 :                 blknum += 1;
     135            0 :                 buf = self.read_blk(blknum, ctx).await?;
     136            0 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
     137            0 :                 off = 4 - thislen;
     138        36816 :             } else {
     139        36816 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
     140        36816 :                 off += 4;
     141        36816 :             }
     142        36816 :             let bit_mask = if self.read_compressed {
     143          120 :                 !LEN_COMPRESSION_BIT_MASK
     144              :             } else {
     145        36696 :                 0x7f
     146              :             };
     147        36816 :             len_buf[0] &= bit_mask;
     148        36816 :             u32::from_be_bytes(len_buf) as usize
     149              :         };
     150        50160 :         let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     151        50160 : 
     152        50160 :         let mut tmp_buf = Vec::new();
     153              :         let buf_to_write;
     154        50160 :         let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
     155        50136 :             if compression_bits > BYTE_UNCOMPRESSED {
     156            0 :                 warn!("reading key above future limit ({len} bytes)");
     157        50136 :             }
     158        50136 :             buf_to_write = dstbuf;
     159        50136 :             None
     160           24 :         } else if compression_bits == BYTE_ZSTD {
     161           24 :             buf_to_write = &mut tmp_buf;
     162           24 :             Some(dstbuf)
     163              :         } else {
     164            0 :             let error = std::io::Error::new(
     165            0 :                 std::io::ErrorKind::InvalidData,
     166            0 :                 format!("invalid compression byte {compression_bits:x}"),
     167            0 :             );
     168            0 :             return Err(error);
     169              :         };
     170              : 
     171        50160 :         buf_to_write.clear();
     172        50160 :         buf_to_write.reserve(len);
     173        50160 : 
     174        50160 :         // Read the payload
     175        50160 :         let mut remain = len;
     176       176616 :         while remain > 0 {
     177       126456 :             let mut page_remain = PAGE_SZ - off;
     178       126456 :             if page_remain == 0 {
     179              :                 // continue on next page
     180        76536 :                 blknum += 1;
     181        76536 :                 buf = self.read_blk(blknum, ctx).await?;
     182        76536 :                 off = 0;
     183        76536 :                 page_remain = PAGE_SZ;
     184        49920 :             }
     185       126456 :             let this_blk_len = min(remain, page_remain);
     186       126456 :             buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
     187       126456 :             remain -= this_blk_len;
     188       126456 :             off += this_blk_len;
     189              :         }
     190              : 
     191        50160 :         if let Some(dstbuf) = compression {
     192           24 :             if compression_bits == BYTE_ZSTD {
     193           24 :                 let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
     194           24 :                 decoder.write_all(buf_to_write).await?;
     195           24 :                 decoder.flush().await?;
     196              :             } else {
     197            0 :                 unreachable!("already checked above")
     198              :             }
     199        50136 :         }
     200              : 
     201        50160 :         Ok(())
     202        50160 :     }
     203              : }
     204              : 
     205              : /// Reserved bits for length and compression
     206              : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
     207              : 
     208              : /// The maximum size of blobs we support. The highest few bits
     209              : /// are reserved for compression and other further uses.
     210              : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
     211              : 
     212              : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
     213              : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
     214              : 
     215              : /// A wrapper of `VirtualFile` that allows users to write blobs.
     216              : ///
     217              : /// If a `BlobWriter` is dropped, the internal buffer will be
     218              : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
     219              : /// manually before dropping.
     220              : pub struct BlobWriter<const BUFFERED: bool> {
     221              :     inner: VirtualFile,
     222              :     offset: u64,
     223              :     /// A buffer to save on write calls, only used if BUFFERED=true
     224              :     buf: Vec<u8>,
     225              :     /// We do tiny writes for the length headers; they need to be in an owned buffer;
     226              :     io_buf: Option<BytesMut>,
     227              : }
     228              : 
     229              : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     230        12840 :     pub fn new(
     231        12840 :         inner: VirtualFile,
     232        12840 :         start_offset: u64,
     233        12840 :         _gate: &utils::sync::gate::Gate,
     234        12840 :         _cancel: CancellationToken,
     235        12840 :         _ctx: &RequestContext,
     236        12840 :     ) -> Self {
     237        12840 :         Self {
     238        12840 :             inner,
     239        12840 :             offset: start_offset,
     240        12840 :             buf: Vec::with_capacity(Self::CAPACITY),
     241        12840 :             io_buf: Some(BytesMut::new()),
     242        12840 :         }
     243        12840 :     }
     244              : 
     245     12299256 :     pub fn size(&self) -> u64 {
     246     12299256 :         self.offset
     247     12299256 :     }
     248              : 
     249              :     const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
     250              : 
     251              :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     252              :     /// You need to make sure that the internal buffer is empty, otherwise
     253              :     /// data will be written in wrong order.
     254              :     #[inline(always)]
     255       619260 :     async fn write_all_unbuffered<Buf: IoBuf + Send>(
     256       619260 :         &mut self,
     257       619260 :         src_buf: FullSlice<Buf>,
     258       619260 :         ctx: &RequestContext,
     259       619260 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     260       619260 :         let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
     261       619260 :         let nbytes = match res {
     262       619260 :             Ok(nbytes) => nbytes,
     263            0 :             Err(e) => return (src_buf, Err(e)),
     264              :         };
     265       619260 :         self.offset += nbytes as u64;
     266       619260 :         (src_buf, Ok(()))
     267       619260 :     }
     268              : 
     269              :     #[inline(always)]
     270              :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     271        73932 :     pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
     272        73932 :         let buf = std::mem::take(&mut self.buf);
     273        73932 :         let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
     274        73932 :         res?;
     275        73932 :         let mut buf = slice.into_raw_slice().into_inner();
     276        73932 :         buf.clear();
     277        73932 :         self.buf = buf;
     278        73932 :         Ok(())
     279        73932 :     }
     280              : 
     281              :     #[inline(always)]
     282              :     /// Writes as much of `src_buf` into the internal buffer as it fits
     283     78066408 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     284     78066408 :         let remaining = Self::CAPACITY - self.buf.len();
     285     78066408 :         let to_copy = src_buf.len().min(remaining);
     286     78066408 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     287     78066408 :         self.offset += to_copy as u64;
     288     78066408 :         to_copy
     289     78066408 :     }
     290              : 
     291              :     /// Internal, possibly buffered, write function
     292     78621072 :     async fn write_all<Buf: IoBuf + Send>(
     293     78621072 :         &mut self,
     294     78621072 :         src_buf: FullSlice<Buf>,
     295     78621072 :         ctx: &RequestContext,
     296     78621072 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     297     78621072 :         let src_buf = src_buf.into_raw_slice();
     298     78621072 :         let src_buf_bounds = src_buf.bounds();
     299     78621072 :         let restore = move |src_buf_slice: Slice<_>| {
     300     78001812 :             FullSlice::must_new(Slice::from_buf_bounds(
     301     78001812 :                 src_buf_slice.into_inner(),
     302     78001812 :                 src_buf_bounds,
     303     78001812 :             ))
     304     78001812 :         };
     305              : 
     306     78621072 :         if !BUFFERED {
     307       617880 :             assert!(self.buf.is_empty());
     308       617880 :             return self
     309       617880 :                 .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
     310       617880 :                 .await;
     311     78003192 :         }
     312     78003192 :         let remaining = Self::CAPACITY - self.buf.len();
     313     78003192 :         let src_buf_len = src_buf.bytes_init();
     314     78003192 :         if src_buf_len == 0 {
     315          144 :             return (restore(src_buf), Ok(()));
     316     78003048 :         }
     317     78003048 :         let mut src_buf = src_buf.slice(0..src_buf_len);
     318     78003048 :         // First try to copy as much as we can into the buffer
     319     78003048 :         if remaining > 0 {
     320     78003048 :             let copied = self.write_into_buffer(&src_buf);
     321     78003048 :             src_buf = src_buf.slice(copied..);
     322     78003048 :         }
     323              :         // Then, if the buffer is full, flush it out
     324     78003048 :         if self.buf.len() == Self::CAPACITY {
     325        64992 :             if let Err(e) = self.flush_buffer(ctx).await {
     326            0 :                 return (restore(src_buf), Err(e));
     327        64992 :             }
     328     77938056 :         }
     329              :         // Finally, write the tail of src_buf:
     330              :         // If it wholly fits into the buffer without
     331              :         // completely filling it, then put it there.
     332              :         // If not, write it out directly.
     333     78003048 :         let src_buf = if !src_buf.is_empty() {
     334        64740 :             assert_eq!(self.buf.len(), 0);
     335        64740 :             if src_buf.len() < Self::CAPACITY {
     336        63360 :                 let copied = self.write_into_buffer(&src_buf);
     337        63360 :                 // We just verified above that src_buf fits into our internal buffer.
     338        63360 :                 assert_eq!(copied, src_buf.len());
     339        63360 :                 restore(src_buf)
     340              :             } else {
     341         1380 :                 let (src_buf, res) = self
     342         1380 :                     .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
     343         1380 :                     .await;
     344         1380 :                 if let Err(e) = res {
     345            0 :                     return (src_buf, Err(e));
     346         1380 :                 }
     347         1380 :                 src_buf
     348              :             }
     349              :         } else {
     350     77938308 :             restore(src_buf)
     351              :         };
     352     78003048 :         (src_buf, Ok(()))
     353     78621072 :     }
     354              : 
     355              :     /// Write a blob of data. Returns the offset that it was written to,
     356              :     /// which can be used to retrieve the data later.
     357        62088 :     pub async fn write_blob<Buf: IoBuf + Send>(
     358        62088 :         &mut self,
     359        62088 :         srcbuf: FullSlice<Buf>,
     360        62088 :         ctx: &RequestContext,
     361        62088 :     ) -> (FullSlice<Buf>, Result<u64, Error>) {
     362        62088 :         let (buf, res) = self
     363        62088 :             .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
     364        62088 :             .await;
     365        62088 :         (buf, res.map(|(off, _compression_info)| off))
     366        62088 :     }
     367              : 
     368              :     /// Write a blob of data. Returns the offset that it was written to,
     369              :     /// which can be used to retrieve the data later.
     370     39261384 :     pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
     371     39261384 :         &mut self,
     372     39261384 :         srcbuf: FullSlice<Buf>,
     373     39261384 :         ctx: &RequestContext,
     374     39261384 :         algorithm: ImageCompressionAlgorithm,
     375     39261384 :     ) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
     376     39261384 :         let offset = self.offset;
     377     39261384 :         let mut compression_info = CompressionInfo {
     378     39261384 :             written_compressed: false,
     379     39261384 :             compressed_size: None,
     380     39261384 :         };
     381     39261384 : 
     382     39261384 :         let len = srcbuf.len();
     383     39261384 : 
     384     39261384 :         let mut io_buf = self.io_buf.take().expect("we always put it back below");
     385     39261384 :         io_buf.clear();
     386     39261384 :         let mut compressed_buf = None;
     387     39261384 :         let ((io_buf_slice, hdr_res), srcbuf) = async {
     388     39261384 :             if len < 128 {
     389              :                 // Short blob. Write a 1-byte length header
     390     39043176 :                 io_buf.put_u8(len as u8);
     391     39043176 :                 (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
     392              :             } else {
     393              :                 // Write a 4-byte length header
     394       218208 :                 if len > MAX_SUPPORTED_BLOB_LEN {
     395            0 :                     return (
     396            0 :                         (
     397            0 :                             io_buf.slice_len(),
     398            0 :                             Err(Error::other(format!("blob too large ({len} bytes)"))),
     399            0 :                         ),
     400            0 :                         srcbuf,
     401            0 :                     );
     402       218208 :                 }
     403       218208 :                 let (high_bit_mask, len_written, srcbuf) = match algorithm {
     404        60312 :                     ImageCompressionAlgorithm::Zstd { level } => {
     405        60312 :                         let mut encoder = if let Some(level) = level {
     406        60312 :                             async_compression::tokio::write::ZstdEncoder::with_quality(
     407        60312 :                                 Vec::new(),
     408        60312 :                                 Level::Precise(level.into()),
     409        60312 :                             )
     410              :                         } else {
     411            0 :                             async_compression::tokio::write::ZstdEncoder::new(Vec::new())
     412              :                         };
     413        60312 :                         encoder.write_all(&srcbuf[..]).await.unwrap();
     414        60312 :                         encoder.shutdown().await.unwrap();
     415        60312 :                         let compressed = encoder.into_inner();
     416        60312 :                         compression_info.compressed_size = Some(compressed.len());
     417        60312 :                         if compressed.len() < len {
     418           36 :                             compression_info.written_compressed = true;
     419           36 :                             let compressed_len = compressed.len();
     420           36 :                             compressed_buf = Some(compressed);
     421           36 :                             (BYTE_ZSTD, compressed_len, srcbuf)
     422              :                         } else {
     423        60276 :                             (BYTE_UNCOMPRESSED, len, srcbuf)
     424              :                         }
     425              :                     }
     426       157896 :                     ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
     427              :                 };
     428       218208 :                 let mut len_buf = (len_written as u32).to_be_bytes();
     429       218208 :                 assert_eq!(len_buf[0] & 0xf0, 0);
     430       218208 :                 len_buf[0] |= high_bit_mask;
     431       218208 :                 io_buf.extend_from_slice(&len_buf[..]);
     432       218208 :                 (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
     433              :             }
     434     39261384 :         }
     435     39261384 :         .await;
     436     39261384 :         self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
     437     39261384 :         match hdr_res {
     438     39261384 :             Ok(_) => (),
     439            0 :             Err(e) => return (srcbuf, Err(e)),
     440              :         }
     441     39261384 :         let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
     442           36 :             let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
     443           36 :             (srcbuf, res)
     444              :         } else {
     445     39261348 :             self.write_all(srcbuf, ctx).await
     446              :         };
     447     39261384 :         (srcbuf, res.map(|_| (offset, compression_info)))
     448     39261384 :     }
     449              : 
     450              :     /// Writes a raw blob containing both header and data, returning its offset.
     451        98304 :     pub(crate) async fn write_blob_raw<Buf: IoBuf + Send>(
     452        98304 :         &mut self,
     453        98304 :         raw_with_header: FullSlice<Buf>,
     454        98304 :         ctx: &RequestContext,
     455        98304 :     ) -> (FullSlice<Buf>, Result<u64, Error>) {
     456              :         // Verify the header, to ensure we don't write invalid/corrupt data.
     457        98304 :         let header = match Header::decode(&raw_with_header) {
     458        98304 :             Ok(header) => header,
     459            0 :             Err(err) => return (raw_with_header, Err(err)),
     460              :         };
     461        98304 :         if raw_with_header.len() != header.total_len() {
     462            0 :             let header_total_len = header.total_len();
     463            0 :             let raw_len = raw_with_header.len();
     464            0 :             return (
     465            0 :                 raw_with_header,
     466            0 :                 Err(std::io::Error::new(
     467            0 :                     std::io::ErrorKind::InvalidData,
     468            0 :                     format!("header length mismatch: {header_total_len} != {raw_len}"),
     469            0 :                 )),
     470            0 :             );
     471        98304 :         }
     472        98304 : 
     473        98304 :         let offset = self.offset;
     474        98304 :         let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
     475        98304 :         (raw_with_header, result.map(|_| offset))
     476        98304 :     }
     477              : }
     478              : 
     479              : impl BlobWriter<true> {
     480              :     /// Access the underlying `VirtualFile`.
     481              :     ///
     482              :     /// This function flushes the internal buffer before giving access
     483              :     /// to the underlying `VirtualFile`.
     484         8700 :     pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
     485         8700 :         self.flush_buffer(ctx).await?;
     486         8700 :         Ok(self.inner)
     487         8700 :     }
     488              : 
     489              :     /// Access the underlying `VirtualFile`.
     490              :     ///
     491              :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     492              :     /// the internal buffer before giving access.
     493          156 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     494          156 :         self.inner
     495          156 :     }
     496              : }
     497              : 
     498              : impl BlobWriter<false> {
     499              :     /// Access the underlying `VirtualFile`.
     500         3744 :     pub fn into_inner(self) -> VirtualFile {
     501         3744 :         self.inner
     502         3744 :     }
     503              : }
     504              : 
     505              : #[cfg(test)]
     506              : pub(crate) mod tests {
     507              :     use camino::Utf8PathBuf;
     508              :     use camino_tempfile::Utf8TempDir;
     509              :     use rand::{Rng, SeedableRng};
     510              : 
     511              :     use super::*;
     512              :     use crate::context::DownloadBehavior;
     513              :     use crate::task_mgr::TaskKind;
     514              :     use crate::tenant::block_io::BlockReaderRef;
     515              : 
     516          144 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     517          144 :         round_trip_test_compressed::<BUFFERED>(blobs, false).await
     518          144 :     }
     519              : 
     520          240 :     pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
     521          240 :         blobs: &[Vec<u8>],
     522          240 :         compression: bool,
     523          240 :         ctx: &RequestContext,
     524          240 :     ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
     525          240 :         let temp_dir = camino_tempfile::tempdir()?;
     526          240 :         let pathbuf = temp_dir.path().join("file");
     527          240 :         let gate = utils::sync::gate::Gate::default();
     528          240 :         let cancel = CancellationToken::new();
     529          240 : 
     530          240 :         // Write part (in block to drop the file)
     531          240 :         let mut offsets = Vec::new();
     532              :         {
     533          240 :             let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
     534          240 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
     535        74448 :             for blob in blobs.iter() {
     536        74448 :                 let (_, res) = if compression {
     537        12600 :                     let res = wtr
     538        12600 :                         .write_blob_maybe_compressed(
     539        12600 :                             blob.clone().slice_len(),
     540        12600 :                             ctx,
     541        12600 :                             ImageCompressionAlgorithm::Zstd { level: Some(1) },
     542        12600 :                         )
     543        12600 :                         .await;
     544        12600 :                     (res.0, res.1.map(|(off, _)| off))
     545              :                 } else {
     546        61848 :                     wtr.write_blob(blob.clone().slice_len(), ctx).await
     547              :                 };
     548        74448 :                 let offs = res?;
     549        74448 :                 offsets.push(offs);
     550              :             }
     551              :             // Write out one page worth of zeros so that we can
     552              :             // read again with read_blk
     553          240 :             let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
     554          240 :             let offs = res?;
     555          240 :             println!("Writing final blob at offs={offs}");
     556          240 :             wtr.flush_buffer(ctx).await?;
     557              :         }
     558          240 :         Ok((temp_dir, pathbuf, offsets))
     559          240 :     }
     560              : 
     561          192 :     async fn round_trip_test_compressed<const BUFFERED: bool>(
     562          192 :         blobs: &[Vec<u8>],
     563          192 :         compression: bool,
     564          192 :     ) -> Result<(), Error> {
     565          192 :         let ctx =
     566          192 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     567          192 :         let (_temp_dir, pathbuf, offsets) =
     568          192 :             write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
     569              : 
     570          192 :         let file = VirtualFile::open(pathbuf, &ctx).await?;
     571          192 :         let rdr = BlockReaderRef::VirtualFile(&file);
     572          192 :         let rdr = BlockCursor::new_with_compression(rdr, compression);
     573        49728 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     574        49728 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     575        49728 :             assert_eq!(
     576        49728 :                 blob, &blob_read,
     577            0 :                 "mismatch for idx={idx} at offset={offset}"
     578              :             );
     579              :         }
     580          192 :         Ok(())
     581          192 :     }
     582              : 
     583        36948 :     pub(crate) fn random_array(len: usize) -> Vec<u8> {
     584        36948 :         let mut rng = rand::thread_rng();
     585    408911424 :         (0..len).map(|_| rng.r#gen()).collect::<_>()
     586        36948 :     }
     587              : 
     588              :     #[tokio::test]
     589           12 :     async fn test_one() -> Result<(), Error> {
     590           12 :         let blobs = &[vec![12, 21, 22]];
     591           12 :         round_trip_test::<false>(blobs).await?;
     592           12 :         round_trip_test::<true>(blobs).await?;
     593           12 :         Ok(())
     594           12 :     }
     595              : 
     596              :     #[tokio::test]
     597           12 :     async fn test_hello_simple() -> Result<(), Error> {
     598           12 :         let blobs = &[
     599           12 :             vec![0, 1, 2, 3],
     600           12 :             b"Hello, World!".to_vec(),
     601           12 :             Vec::new(),
     602           12 :             b"foobar".to_vec(),
     603           12 :         ];
     604           12 :         round_trip_test::<false>(blobs).await?;
     605           12 :         round_trip_test::<true>(blobs).await?;
     606           12 :         round_trip_test_compressed::<false>(blobs, true).await?;
     607           12 :         round_trip_test_compressed::<true>(blobs, true).await?;
     608           12 :         Ok(())
     609           12 :     }
     610              : 
     611              :     #[tokio::test]
     612           12 :     async fn test_really_big_array() -> Result<(), Error> {
     613           12 :         let blobs = &[
     614           12 :             b"test".to_vec(),
     615           12 :             random_array(10 * PAGE_SZ),
     616           12 :             b"hello".to_vec(),
     617           12 :             random_array(66 * PAGE_SZ),
     618           12 :             vec![0xf3; 24 * PAGE_SZ],
     619           12 :             b"foobar".to_vec(),
     620           12 :         ];
     621           12 :         round_trip_test::<false>(blobs).await?;
     622           12 :         round_trip_test::<true>(blobs).await?;
     623           12 :         round_trip_test_compressed::<false>(blobs, true).await?;
     624           12 :         round_trip_test_compressed::<true>(blobs, true).await?;
     625           12 :         Ok(())
     626           12 :     }
     627              : 
     628              :     #[tokio::test]
     629           12 :     async fn test_arrays_inc() -> Result<(), Error> {
     630           12 :         let blobs = (0..PAGE_SZ / 8)
     631        12288 :             .map(|v| random_array(v * 16))
     632           12 :             .collect::<Vec<_>>();
     633           12 :         round_trip_test::<false>(&blobs).await?;
     634           12 :         round_trip_test::<true>(&blobs).await?;
     635           12 :         Ok(())
     636           12 :     }
     637              : 
     638              :     #[tokio::test]
     639           12 :     async fn test_arrays_random_size() -> Result<(), Error> {
     640           12 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     641           12 :         let blobs = (0..1024)
     642        12288 :             .map(|_| {
     643        12288 :                 let mut sz: u16 = rng.r#gen();
     644        12288 :                 // Make 50% of the arrays small
     645        12288 :                 if rng.r#gen() {
     646         6192 :                     sz &= 63;
     647         6192 :                 }
     648        12288 :                 random_array(sz.into())
     649        12288 :             })
     650           12 :             .collect::<Vec<_>>();
     651           12 :         round_trip_test::<false>(&blobs).await?;
     652           12 :         round_trip_test::<true>(&blobs).await?;
     653           12 :         Ok(())
     654           12 :     }
     655              : 
     656              :     #[tokio::test]
     657           12 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     658           12 :         let blobs = &[
     659           12 :             random_array(PAGE_SZ - 4),
     660           12 :             random_array(PAGE_SZ - 4),
     661           12 :             random_array(PAGE_SZ - 4),
     662           12 :         ];
     663           12 :         round_trip_test::<false>(blobs).await?;
     664           12 :         round_trip_test::<true>(blobs).await?;
     665           12 :         Ok(())
     666           12 :     }
     667              : }
        

Generated by: LCOV version 2.1-beta