|             Line data    Source code 
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte. For blobs larger than 128 bits,
      10              : //! we also specify three reserved bits, only one of the three bit
      11              : //! patterns is currently in use (0b011) and signifies compression
      12              : //! with zstd.
      13              : //!
      14              : //! len <  128: 0XXXXXXX
      15              : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      16              : //!
      17              : use std::cmp::min;
      18              : 
      19              : use anyhow::Context;
      20              : use async_compression::Level;
      21              : use bytes::{BufMut, BytesMut};
      22              : use pageserver_api::models::ImageCompressionAlgorithm;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::IoBuf;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::warn;
      27              : 
      28              : use crate::context::RequestContext;
      29              : use crate::page_cache::PAGE_SZ;
      30              : use crate::tenant::block_io::BlockCursor;
      31              : use crate::virtual_file::IoBufferMut;
      32              : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
      33              : use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
      34              : use crate::virtual_file::owned_buffers_io::write::{BufferedWriterShutdownMode, OwnedAsyncWriter};
      35              : 
      36              : #[derive(Copy, Clone, Debug)]
      37              : pub struct CompressionInfo {
      38              :     pub written_compressed: bool,
      39              :     pub compressed_size: Option<usize>,
      40              : }
      41              : 
      42              : /// A blob header, with header+data length and compression info.
      43              : ///
      44              : /// TODO: use this more widely, and add an encode() method too.
      45              : /// TODO: document the header format.
      46              : #[derive(Clone, Copy, Default)]
      47              : pub struct Header {
      48              :     pub header_len: usize,
      49              :     pub data_len: usize,
      50              :     pub compression_bits: u8,
      51              : }
      52              : 
      53              : impl Header {
      54              :     /// Decodes a header from a byte slice.
      55      1932411 :     pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
      56      1932411 :         let Some(&first_header_byte) = bytes.first() else {
      57            0 :             anyhow::bail!("zero-length blob header");
      58              :         };
      59              : 
      60              :         // If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
      61      1932411 :         if first_header_byte < 0x80 {
      62      1912174 :             return Ok(Self {
      63      1912174 :                 header_len: 1, // by definition
      64      1912174 :                 data_len: first_header_byte as usize,
      65      1912174 :                 compression_bits: BYTE_UNCOMPRESSED,
      66      1912174 :             });
      67        20237 :         }
      68              : 
      69              :         // Otherwise, this is a 4-byte header containing compression information and length.
      70              :         const HEADER_LEN: usize = 4;
      71        20237 :         let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN]
      72        20237 :             .try_into()
      73        20237 :             .map_err(|_| anyhow::anyhow!("blob header too short: {bytes:?}"))?;
      74              : 
      75              :         // TODO: verify the compression bits and convert to an enum.
      76        20237 :         let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
      77        20237 :         header_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
      78        20237 :         let data_len = u32::from_be_bytes(header_buf) as usize;
      79              : 
      80        20237 :         Ok(Self {
      81        20237 :             header_len: HEADER_LEN,
      82        20237 :             data_len,
      83        20237 :             compression_bits,
      84        20237 :         })
      85      1932411 :     }
      86              : 
      87              :     /// Returns the total header+data length.
      88        10248 :     pub fn total_len(&self) -> usize {
      89        10248 :         self.header_len + self.data_len
      90        10248 :     }
      91              : }
      92              : 
      93              : #[derive(Debug, thiserror::Error)]
      94              : pub enum WriteBlobError {
      95              :     #[error(transparent)]
      96              :     Flush(FlushTaskError),
      97              :     #[error(transparent)]
      98              :     Other(anyhow::Error),
      99              : }
     100              : 
     101              : impl WriteBlobError {
     102            0 :     pub fn is_cancel(&self) -> bool {
     103            0 :         match self {
     104            0 :             WriteBlobError::Flush(e) => e.is_cancel(),
     105            0 :             WriteBlobError::Other(_) => false,
     106              :         }
     107            0 :     }
     108            0 :     pub fn into_anyhow(self) -> anyhow::Error {
     109            0 :         match self {
     110            0 :             WriteBlobError::Flush(e) => e.into_anyhow(),
     111            0 :             WriteBlobError::Other(e) => e,
     112              :         }
     113            0 :     }
     114              : }
     115              : 
     116              : impl BlockCursor<'_> {
     117              :     /// Read a blob into a new buffer.
     118         2076 :     pub async fn read_blob(
     119         2076 :         &self,
     120         2076 :         offset: u64,
     121         2076 :         ctx: &RequestContext,
     122         2076 :     ) -> Result<Vec<u8>, std::io::Error> {
     123         2076 :         let mut buf = Vec::new();
     124         2076 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
     125         2076 :         Ok(buf)
     126         2076 :     }
     127              :     /// Read blob into the given buffer. Any previous contents in the buffer
     128              :     /// are overwritten.
     129         2108 :     pub async fn read_blob_into_buf(
     130         2108 :         &self,
     131         2108 :         offset: u64,
     132         2108 :         dstbuf: &mut Vec<u8>,
     133         2108 :         ctx: &RequestContext,
     134         2108 :     ) -> Result<(), std::io::Error> {
     135         2108 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
     136         2108 :         let mut off = (offset % PAGE_SZ as u64) as usize;
     137              : 
     138         2108 :         let mut buf = self.read_blk(blknum, ctx).await?;
     139              : 
     140              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
     141         2108 :         let first_len_byte = buf[off];
     142         2108 :         let len: usize = if first_len_byte < 0x80 {
     143              :             // 1-byte length header
     144          572 :             off += 1;
     145          572 :             first_len_byte as usize
     146              :         } else {
     147              :             // 4-byte length header
     148         1536 :             let mut len_buf = [0u8; 4];
     149         1536 :             let thislen = PAGE_SZ - off;
     150         1536 :             if thislen < 4 {
     151              :                 // it is split across two pages
     152            0 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
     153            0 :                 blknum += 1;
     154            0 :                 buf = self.read_blk(blknum, ctx).await?;
     155            0 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
     156            0 :                 off = 4 - thislen;
     157         1536 :             } else {
     158         1536 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
     159         1536 :                 off += 4;
     160         1536 :             }
     161         1536 :             let bit_mask = if self.read_compressed {
     162            7 :                 !LEN_COMPRESSION_BIT_MASK
     163              :             } else {
     164         1529 :                 0x7f
     165              :             };
     166         1536 :             len_buf[0] &= bit_mask;
     167         1536 :             u32::from_be_bytes(len_buf) as usize
     168              :         };
     169         2108 :         let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     170              : 
     171         2108 :         let mut tmp_buf = Vec::new();
     172              :         let buf_to_write;
     173         2108 :         let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
     174         2107 :             if compression_bits > BYTE_UNCOMPRESSED {
     175            0 :                 warn!("reading key above future limit ({len} bytes)");
     176         2107 :             }
     177         2107 :             buf_to_write = dstbuf;
     178         2107 :             None
     179            1 :         } else if compression_bits == BYTE_ZSTD {
     180            1 :             buf_to_write = &mut tmp_buf;
     181            1 :             Some(dstbuf)
     182              :         } else {
     183            0 :             let error = std::io::Error::new(
     184            0 :                 std::io::ErrorKind::InvalidData,
     185            0 :                 format!("invalid compression byte {compression_bits:x}"),
     186              :             );
     187            0 :             return Err(error);
     188              :         };
     189              : 
     190         2108 :         buf_to_write.clear();
     191         2108 :         buf_to_write.reserve(len);
     192              : 
     193              :         // Read the payload
     194         2108 :         let mut remain = len;
     195         7427 :         while remain > 0 {
     196         5319 :             let mut page_remain = PAGE_SZ - off;
     197         5319 :             if page_remain == 0 {
     198              :                 // continue on next page
     199         3221 :                 blknum += 1;
     200         3221 :                 buf = self.read_blk(blknum, ctx).await?;
     201         3221 :                 off = 0;
     202         3221 :                 page_remain = PAGE_SZ;
     203         2098 :             }
     204         5319 :             let this_blk_len = min(remain, page_remain);
     205         5319 :             buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
     206         5319 :             remain -= this_blk_len;
     207         5319 :             off += this_blk_len;
     208              :         }
     209              : 
     210         2108 :         if let Some(dstbuf) = compression {
     211            1 :             if compression_bits == BYTE_ZSTD {
     212            1 :                 let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
     213            1 :                 decoder.write_all(buf_to_write).await?;
     214            1 :                 decoder.flush().await?;
     215              :             } else {
     216            0 :                 unreachable!("already checked above")
     217              :             }
     218         2107 :         }
     219              : 
     220         2108 :         Ok(())
     221         2108 :     }
     222              : }
     223              : 
     224              : /// Reserved bits for length and compression
     225              : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
     226              : 
     227              : /// The maximum size of blobs we support. The highest few bits
     228              : /// are reserved for compression and other further uses.
     229              : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
     230              : 
     231              : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
     232              : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
     233              : 
     234              : /// A wrapper of `VirtualFile` that allows users to write blobs.
     235              : pub struct BlobWriter<W> {
     236              :     /// We do tiny writes for the length headers; they need to be in an owned buffer;
     237              :     io_buf: Option<BytesMut>,
     238              :     writer: BufferedWriter<IoBufferMut, W>,
     239              :     offset: u64,
     240              : }
     241              : 
     242              : impl<W> BlobWriter<W>
     243              : where
     244              :     W: OwnedAsyncWriter + std::fmt::Debug + Send + Sync + 'static,
     245              : {
     246              :     /// See [`BufferedWriter`] struct-level doc comment for semantics of `start_offset`.
     247         1129 :     pub fn new(
     248         1129 :         file: W,
     249         1129 :         start_offset: u64,
     250         1129 :         gate: &utils::sync::gate::Gate,
     251         1129 :         cancel: CancellationToken,
     252         1129 :         ctx: &RequestContext,
     253         1129 :         flush_task_span: tracing::Span,
     254         1129 :     ) -> anyhow::Result<Self> {
     255              :         Ok(Self {
     256         1129 :             io_buf: Some(BytesMut::new()),
     257         1129 :             writer: BufferedWriter::new(
     258         1129 :                 file,
     259         1129 :                 start_offset,
     260         2258 :                 || IoBufferMut::with_capacity(Self::CAPACITY),
     261         1129 :                 gate.enter()?,
     262         1129 :                 cancel,
     263         1129 :                 ctx,
     264         1129 :                 flush_task_span,
     265              :             ),
     266         1129 :             offset: start_offset,
     267              :         })
     268         1129 :     }
     269              : 
     270      1030605 :     pub fn size(&self) -> u64 {
     271      1030605 :         self.offset
     272      1030605 :     }
     273              : 
     274              :     const CAPACITY: usize = 64 * 1024;
     275              : 
     276              :     /// Writes `src_buf` to the file at the current offset.
     277      6547618 :     async fn write_all<Buf: IoBuf + Send>(
     278      6547618 :         &mut self,
     279      6547618 :         src_buf: FullSlice<Buf>,
     280      6547618 :         ctx: &RequestContext,
     281      6547618 :     ) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
     282      6547618 :         let res = self
     283      6547618 :             .writer
     284      6547618 :             // TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
     285      6547618 :             // Can remove all the complexity around owned buffers upstack
     286      6547618 :             .write_buffered_borrowed(&src_buf, ctx)
     287      6547618 :             .await
     288      6547618 :             .map(|len| {
     289      6547618 :                 self.offset += len as u64;
     290      6547618 :             });
     291              : 
     292      6547618 :         (src_buf, res)
     293      6547618 :     }
     294              : 
     295              :     /// Write a blob of data. Returns the offset that it was written to,
     296              :     /// which can be used to retrieve the data later.
     297         3092 :     pub async fn write_blob<Buf: IoBuf + Send>(
     298         3092 :         &mut self,
     299         3092 :         srcbuf: FullSlice<Buf>,
     300         3092 :         ctx: &RequestContext,
     301         3092 :     ) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
     302         3092 :         let (buf, res) = self
     303         3092 :             .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
     304         3092 :             .await;
     305         3092 :         (buf, res.map(|(off, _compression_info)| off))
     306         3092 :     }
     307              : 
     308              :     /// Write a blob of data. Returns the offset that it was written to,
     309              :     /// which can be used to retrieve the data later.
     310      3269713 :     pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
     311      3269713 :         &mut self,
     312      3269713 :         srcbuf: FullSlice<Buf>,
     313      3269713 :         ctx: &RequestContext,
     314      3269713 :         algorithm: ImageCompressionAlgorithm,
     315      3269713 :     ) -> (
     316      3269713 :         FullSlice<Buf>,
     317      3269713 :         Result<(u64, CompressionInfo), WriteBlobError>,
     318      3269713 :     ) {
     319      3269713 :         let offset = self.offset;
     320      3269713 :         let mut compression_info = CompressionInfo {
     321      3269713 :             written_compressed: false,
     322      3269713 :             compressed_size: None,
     323      3269713 :         };
     324              : 
     325      3269713 :         let len = srcbuf.len();
     326              : 
     327      3269713 :         let mut io_buf = self.io_buf.take().expect("we always put it back below");
     328      3269713 :         io_buf.clear();
     329      3269713 :         let mut compressed_buf = None;
     330      3269713 :         let ((io_buf_slice, hdr_res), srcbuf) = async {
     331      3269713 :             if len < 128 {
     332              :                 // Short blob. Write a 1-byte length header
     333      3253081 :                 io_buf.put_u8(len as u8);
     334      3253081 :                 let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
     335      3253081 :                 let res = res.map_err(WriteBlobError::Flush);
     336      3253081 :                 ((slice, res), srcbuf)
     337              :             } else {
     338              :                 // Write a 4-byte length header
     339        16632 :                 if len > MAX_SUPPORTED_BLOB_LEN {
     340            0 :                     return (
     341            0 :                         (
     342            0 :                             io_buf.slice_len(),
     343            0 :                             Err(WriteBlobError::Other(anyhow::anyhow!(
     344            0 :                                 "blob too large ({len} bytes)"
     345            0 :                             ))),
     346            0 :                         ),
     347            0 :                         srcbuf,
     348            0 :                     );
     349        16632 :                 }
     350        16632 :                 let (high_bit_mask, len_written, srcbuf) = match algorithm {
     351         5023 :                     ImageCompressionAlgorithm::Zstd { level } => {
     352         5023 :                         let mut encoder = if let Some(level) = level {
     353         5023 :                             async_compression::tokio::write::ZstdEncoder::with_quality(
     354         5023 :                                 Vec::new(),
     355         5023 :                                 Level::Precise(level.into()),
     356              :                             )
     357              :                         } else {
     358            0 :                             async_compression::tokio::write::ZstdEncoder::new(Vec::new())
     359              :                         };
     360         5023 :                         encoder.write_all(&srcbuf[..]).await.unwrap();
     361         5023 :                         encoder.shutdown().await.unwrap();
     362         5023 :                         let compressed = encoder.into_inner();
     363         5023 :                         compression_info.compressed_size = Some(compressed.len());
     364         5023 :                         if compressed.len() < len {
     365            2 :                             compression_info.written_compressed = true;
     366            2 :                             let compressed_len = compressed.len();
     367            2 :                             compressed_buf = Some(compressed);
     368            2 :                             (BYTE_ZSTD, compressed_len, srcbuf)
     369              :                         } else {
     370         5021 :                             (BYTE_UNCOMPRESSED, len, srcbuf)
     371              :                         }
     372              :                     }
     373        11609 :                     ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
     374              :                 };
     375        16632 :                 let mut len_buf = (len_written as u32).to_be_bytes();
     376        16632 :                 assert_eq!(len_buf[0] & 0xf0, 0);
     377        16632 :                 len_buf[0] |= high_bit_mask;
     378        16632 :                 io_buf.extend_from_slice(&len_buf[..]);
     379        16632 :                 let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
     380        16632 :                 let res = res.map_err(WriteBlobError::Flush);
     381        16632 :                 ((slice, res), srcbuf)
     382              :             }
     383      3269713 :         }
     384      3269713 :         .await;
     385      3269713 :         self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
     386      3269713 :         match hdr_res {
     387      3269713 :             Ok(_) => (),
     388            0 :             Err(e) => return (srcbuf, Err(e)),
     389              :         }
     390      3269713 :         let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
     391            2 :             let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
     392            2 :             (srcbuf, res)
     393              :         } else {
     394      3269711 :             self.write_all(srcbuf, ctx).await
     395              :         };
     396      3269713 :         let res = res.map_err(WriteBlobError::Flush);
     397      3269713 :         (srcbuf, res.map(|_| (offset, compression_info)))
     398      3269713 :     }
     399              : 
     400              :     /// Writes a raw blob containing both header and data, returning its offset.
     401         8192 :     pub(crate) async fn write_blob_raw<Buf: IoBuf + Send>(
     402         8192 :         &mut self,
     403         8192 :         raw_with_header: FullSlice<Buf>,
     404         8192 :         ctx: &RequestContext,
     405         8192 :     ) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
     406              :         // Verify the header, to ensure we don't write invalid/corrupt data.
     407         8192 :         let header = match Header::decode(&raw_with_header)
     408         8192 :             .context("decoding blob header")
     409         8192 :             .map_err(WriteBlobError::Other)
     410              :         {
     411         8192 :             Ok(header) => header,
     412            0 :             Err(err) => return (raw_with_header, Err(err)),
     413              :         };
     414         8192 :         if raw_with_header.len() != header.total_len() {
     415            0 :             let header_total_len = header.total_len();
     416            0 :             let raw_len = raw_with_header.len();
     417            0 :             return (
     418            0 :                 raw_with_header,
     419            0 :                 Err(WriteBlobError::Other(anyhow::anyhow!(
     420            0 :                     "header length mismatch: {header_total_len} != {raw_len}"
     421            0 :                 ))),
     422            0 :             );
     423         8192 :         }
     424              : 
     425         8192 :         let offset = self.offset;
     426         8192 :         let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
     427         8192 :         let result = result.map_err(WriteBlobError::Flush);
     428         8192 :         (raw_with_header, result.map(|_| offset))
     429         8192 :     }
     430              : 
     431              :     /// Finish this blob writer and return the underlying `W`.
     432          941 :     pub async fn shutdown(
     433          941 :         self,
     434          941 :         mode: BufferedWriterShutdownMode,
     435          941 :         ctx: &RequestContext,
     436          941 :     ) -> Result<W, FlushTaskError> {
     437          941 :         let (_, file) = self.writer.shutdown(mode, ctx).await?;
     438          941 :         Ok(file)
     439          941 :     }
     440              : }
     441              : 
     442              : #[cfg(test)]
     443              : pub(crate) mod tests {
     444              :     use camino::Utf8PathBuf;
     445              :     use camino_tempfile::Utf8TempDir;
     446              :     use rand::{Rng, SeedableRng};
     447              :     use tracing::info_span;
     448              : 
     449              :     use super::*;
     450              :     use crate::context::DownloadBehavior;
     451              :     use crate::task_mgr::TaskKind;
     452              :     use crate::tenant::block_io::BlockReaderRef;
     453              :     use crate::virtual_file;
     454              :     use crate::virtual_file::TempVirtualFile;
     455              :     use crate::virtual_file::VirtualFile;
     456              : 
     457            6 :     async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
     458            6 :         round_trip_test_compressed(blobs, false).await
     459            6 :     }
     460              : 
     461           12 :     pub(crate) async fn write_maybe_compressed(
     462           12 :         blobs: &[Vec<u8>],
     463           12 :         compression: bool,
     464           12 :         ctx: &RequestContext,
     465           12 :     ) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
     466           12 :         let temp_dir = camino_tempfile::tempdir()?;
     467           12 :         let pathbuf = temp_dir.path().join("file");
     468           12 :         let gate = utils::sync::gate::Gate::default();
     469           12 :         let cancel = CancellationToken::new();
     470              : 
     471              :         // Write part (in block to drop the file)
     472           12 :         let mut offsets = Vec::new();
     473              :         {
     474           12 :             let file = TempVirtualFile::new(
     475           12 :                 VirtualFile::open_with_options_v2(
     476           12 :                     pathbuf.as_path(),
     477           12 :                     virtual_file::OpenOptions::new()
     478           12 :                         .create_new(true)
     479           12 :                         .write(true),
     480           12 :                     ctx,
     481           12 :                 )
     482           12 :                 .await?,
     483           12 :                 gate.enter()?,
     484              :             );
     485           12 :             let mut wtr =
     486           12 :                 BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
     487         4132 :             for blob in blobs.iter() {
     488         4132 :                 let (_, res) = if compression {
     489         1040 :                     let res = wtr
     490         1040 :                         .write_blob_maybe_compressed(
     491         1040 :                             blob.clone().slice_len(),
     492         1040 :                             ctx,
     493         1040 :                             ImageCompressionAlgorithm::Zstd { level: Some(1) },
     494         1040 :                         )
     495         1040 :                         .await;
     496         1040 :                     (res.0, res.1.map(|(off, _)| off))
     497              :                 } else {
     498         3092 :                     wtr.write_blob(blob.clone().slice_len(), ctx).await
     499              :                 };
     500         4132 :                 let offs = res?;
     501         4132 :                 offsets.push(offs);
     502              :             }
     503           12 :             let file = wtr
     504           12 :                 .shutdown(
     505           12 :                     BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
     506           12 :                     ctx,
     507           12 :                 )
     508           12 :                 .await?;
     509           12 :             file.disarm_into_inner()
     510              :         };
     511           12 :         Ok((temp_dir, pathbuf, offsets))
     512           12 :     }
     513              : 
     514            8 :     async fn round_trip_test_compressed(
     515            8 :         blobs: &[Vec<u8>],
     516            8 :         compression: bool,
     517            8 :     ) -> anyhow::Result<()> {
     518            8 :         let ctx =
     519            8 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     520            8 :         let (_temp_dir, pathbuf, offsets) =
     521            8 :             write_maybe_compressed(blobs, compression, &ctx).await?;
     522              : 
     523            8 :         println!("Done writing!");
     524            8 :         let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
     525            8 :         let rdr = BlockReaderRef::VirtualFile(&file);
     526            8 :         let rdr = BlockCursor::new_with_compression(rdr, compression);
     527         2072 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     528         2072 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     529         2072 :             assert_eq!(
     530         2072 :                 blob, &blob_read,
     531            0 :                 "mismatch for idx={idx} at offset={offset}"
     532              :             );
     533              :         }
     534            8 :         Ok(())
     535            8 :     }
     536              : 
     537         3079 :     pub(crate) fn random_array(len: usize) -> Vec<u8> {
     538         3079 :         let mut rng = rand::rng();
     539     34075952 :         (0..len).map(|_| rng.random()).collect::<_>()
     540         3079 :     }
     541              : 
     542              :     #[tokio::test]
     543            1 :     async fn test_one() -> anyhow::Result<()> {
     544            1 :         let blobs = &[vec![12, 21, 22]];
     545            1 :         round_trip_test(blobs).await?;
     546            2 :         Ok(())
     547            1 :     }
     548              : 
     549              :     #[tokio::test]
     550            1 :     async fn test_hello_simple() -> anyhow::Result<()> {
     551            1 :         let blobs = &[
     552            1 :             vec![0, 1, 2, 3],
     553            1 :             b"Hello, World!".to_vec(),
     554            1 :             Vec::new(),
     555            1 :             b"foobar".to_vec(),
     556            1 :         ];
     557            1 :         round_trip_test(blobs).await?;
     558            1 :         round_trip_test_compressed(blobs, true).await?;
     559            2 :         Ok(())
     560            1 :     }
     561              : 
     562              :     #[tokio::test]
     563            1 :     async fn test_really_big_array() -> anyhow::Result<()> {
     564            1 :         let blobs = &[
     565            1 :             b"test".to_vec(),
     566            1 :             random_array(10 * PAGE_SZ),
     567            1 :             b"hello".to_vec(),
     568            1 :             random_array(66 * PAGE_SZ),
     569            1 :             vec![0xf3; 24 * PAGE_SZ],
     570            1 :             b"foobar".to_vec(),
     571            1 :         ];
     572            1 :         round_trip_test(blobs).await?;
     573            1 :         round_trip_test_compressed(blobs, true).await?;
     574            2 :         Ok(())
     575            1 :     }
     576              : 
     577              :     #[tokio::test]
     578            1 :     async fn test_arrays_inc() -> anyhow::Result<()> {
     579            1 :         let blobs = (0..PAGE_SZ / 8)
     580         1024 :             .map(|v| random_array(v * 16))
     581            1 :             .collect::<Vec<_>>();
     582            1 :         round_trip_test(&blobs).await?;
     583            2 :         Ok(())
     584            1 :     }
     585              : 
     586              :     #[tokio::test]
     587            1 :     async fn test_arrays_random_size() -> anyhow::Result<()> {
     588            1 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     589            1 :         let blobs = (0..1024)
     590         1024 :             .map(|_| {
     591         1024 :                 let mut sz: u16 = rng.random();
     592              :                 // Make 50% of the arrays small
     593         1024 :                 if rng.random() {
     594          516 :                     sz &= 63;
     595          516 :                 }
     596         1024 :                 random_array(sz.into())
     597         1024 :             })
     598            1 :             .collect::<Vec<_>>();
     599            1 :         round_trip_test(&blobs).await?;
     600            2 :         Ok(())
     601            1 :     }
     602              : 
     603              :     #[tokio::test]
     604            1 :     async fn test_arrays_page_boundary() -> anyhow::Result<()> {
     605            1 :         let blobs = &[
     606            1 :             random_array(PAGE_SZ - 4),
     607            1 :             random_array(PAGE_SZ - 4),
     608            1 :             random_array(PAGE_SZ - 4),
     609            1 :         ];
     610            1 :         round_trip_test(blobs).await?;
     611            2 :         Ok(())
     612            1 :     }
     613              : }
         |