Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use std::cmp::min;
18 :
19 : use anyhow::Context;
20 : use async_compression::Level;
21 : use bytes::{BufMut, BytesMut};
22 : use pageserver_api::models::ImageCompressionAlgorithm;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::IoBuf;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::warn;
27 :
28 : use crate::context::RequestContext;
29 : use crate::page_cache::PAGE_SZ;
30 : use crate::tenant::block_io::BlockCursor;
31 : use crate::virtual_file::IoBufferMut;
32 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
33 : use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
34 : use crate::virtual_file::owned_buffers_io::write::{BufferedWriterShutdownMode, OwnedAsyncWriter};
35 :
36 : #[derive(Copy, Clone, Debug)]
37 : pub struct CompressionInfo {
38 : pub written_compressed: bool,
39 : pub compressed_size: Option<usize>,
40 : }
41 :
42 : /// A blob header, with header+data length and compression info.
43 : ///
44 : /// TODO: use this more widely, and add an encode() method too.
45 : /// TODO: document the header format.
46 : #[derive(Clone, Copy, Default)]
47 : pub struct Header {
48 : pub header_len: usize,
49 : pub data_len: usize,
50 : pub compression_bits: u8,
51 : }
52 :
53 : impl Header {
54 : /// Decodes a header from a byte slice.
55 23347442 : pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
56 23347442 : let Some(&first_header_byte) = bytes.first() else {
57 0 : anyhow::bail!("zero-length blob header");
58 : };
59 :
60 : // If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
61 23347442 : if first_header_byte < 0x80 {
62 23096858 : return Ok(Self {
63 23096858 : header_len: 1, // by definition
64 23096858 : data_len: first_header_byte as usize,
65 23096858 : compression_bits: BYTE_UNCOMPRESSED,
66 23096858 : });
67 250584 : }
68 :
69 : // Otherwise, this is a 4-byte header containing compression information and length.
70 : const HEADER_LEN: usize = 4;
71 250584 : let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN]
72 250584 : .try_into()
73 250584 : .map_err(|_| anyhow::anyhow!("blob header too short: {bytes:?}"))?;
74 :
75 : // TODO: verify the compression bits and convert to an enum.
76 250584 : let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
77 250584 : header_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
78 250584 : let data_len = u32::from_be_bytes(header_buf) as usize;
79 250584 :
80 250584 : Ok(Self {
81 250584 : header_len: HEADER_LEN,
82 250584 : data_len,
83 250584 : compression_bits,
84 250584 : })
85 23347442 : }
86 :
87 : /// Returns the total header+data length.
88 122976 : pub fn total_len(&self) -> usize {
89 122976 : self.header_len + self.data_len
90 122976 : }
91 : }
92 :
93 : #[derive(Debug, thiserror::Error)]
94 : pub enum WriteBlobError {
95 : #[error(transparent)]
96 : Flush(FlushTaskError),
97 : #[error("blob too large ({len} bytes)")]
98 : BlobTooLarge { len: usize },
99 : #[error(transparent)]
100 : WriteBlobRaw(anyhow::Error),
101 : }
102 :
103 : impl BlockCursor<'_> {
104 : /// Read a blob into a new buffer.
105 24912 : pub async fn read_blob(
106 24912 : &self,
107 24912 : offset: u64,
108 24912 : ctx: &RequestContext,
109 24912 : ) -> Result<Vec<u8>, std::io::Error> {
110 24912 : let mut buf = Vec::new();
111 24912 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
112 24912 : Ok(buf)
113 24912 : }
114 : /// Read blob into the given buffer. Any previous contents in the buffer
115 : /// are overwritten.
116 25296 : pub async fn read_blob_into_buf(
117 25296 : &self,
118 25296 : offset: u64,
119 25296 : dstbuf: &mut Vec<u8>,
120 25296 : ctx: &RequestContext,
121 25296 : ) -> Result<(), std::io::Error> {
122 25296 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
123 25296 : let mut off = (offset % PAGE_SZ as u64) as usize;
124 :
125 25296 : let mut buf = self.read_blk(blknum, ctx).await?;
126 :
127 : // peek at the first byte, to determine if it's a 1- or 4-byte length
128 25296 : let first_len_byte = buf[off];
129 25296 : let len: usize = if first_len_byte < 0x80 {
130 : // 1-byte length header
131 6864 : off += 1;
132 6864 : first_len_byte as usize
133 : } else {
134 : // 4-byte length header
135 18432 : let mut len_buf = [0u8; 4];
136 18432 : let thislen = PAGE_SZ - off;
137 18432 : if thislen < 4 {
138 : // it is split across two pages
139 0 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
140 0 : blknum += 1;
141 0 : buf = self.read_blk(blknum, ctx).await?;
142 0 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
143 0 : off = 4 - thislen;
144 18432 : } else {
145 18432 : len_buf.copy_from_slice(&buf[off..off + 4]);
146 18432 : off += 4;
147 18432 : }
148 18432 : let bit_mask = if self.read_compressed {
149 84 : !LEN_COMPRESSION_BIT_MASK
150 : } else {
151 18348 : 0x7f
152 : };
153 18432 : len_buf[0] &= bit_mask;
154 18432 : u32::from_be_bytes(len_buf) as usize
155 : };
156 25296 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
157 25296 :
158 25296 : let mut tmp_buf = Vec::new();
159 : let buf_to_write;
160 25296 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
161 25284 : if compression_bits > BYTE_UNCOMPRESSED {
162 0 : warn!("reading key above future limit ({len} bytes)");
163 25284 : }
164 25284 : buf_to_write = dstbuf;
165 25284 : None
166 12 : } else if compression_bits == BYTE_ZSTD {
167 12 : buf_to_write = &mut tmp_buf;
168 12 : Some(dstbuf)
169 : } else {
170 0 : let error = std::io::Error::new(
171 0 : std::io::ErrorKind::InvalidData,
172 0 : format!("invalid compression byte {compression_bits:x}"),
173 0 : );
174 0 : return Err(error);
175 : };
176 :
177 25296 : buf_to_write.clear();
178 25296 : buf_to_write.reserve(len);
179 25296 :
180 25296 : // Read the payload
181 25296 : let mut remain = len;
182 89124 : while remain > 0 {
183 63828 : let mut page_remain = PAGE_SZ - off;
184 63828 : if page_remain == 0 {
185 : // continue on next page
186 38652 : blknum += 1;
187 38652 : buf = self.read_blk(blknum, ctx).await?;
188 38652 : off = 0;
189 38652 : page_remain = PAGE_SZ;
190 25176 : }
191 63828 : let this_blk_len = min(remain, page_remain);
192 63828 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
193 63828 : remain -= this_blk_len;
194 63828 : off += this_blk_len;
195 : }
196 :
197 25296 : if let Some(dstbuf) = compression {
198 12 : if compression_bits == BYTE_ZSTD {
199 12 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
200 12 : decoder.write_all(buf_to_write).await?;
201 12 : decoder.flush().await?;
202 : } else {
203 0 : unreachable!("already checked above")
204 : }
205 25284 : }
206 :
207 25296 : Ok(())
208 25296 : }
209 : }
210 :
211 : /// Reserved bits for length and compression
212 : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
213 :
214 : /// The maximum size of blobs we support. The highest few bits
215 : /// are reserved for compression and other further uses.
216 : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
217 :
218 : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
219 : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
220 :
221 : /// A wrapper of `VirtualFile` that allows users to write blobs.
222 : pub struct BlobWriter<W> {
223 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
224 : io_buf: Option<BytesMut>,
225 : writer: BufferedWriter<IoBufferMut, W>,
226 : offset: u64,
227 : }
228 :
229 : impl<W> BlobWriter<W>
230 : where
231 : W: OwnedAsyncWriter + std::fmt::Debug + Send + Sync + 'static,
232 : {
233 : /// See [`BufferedWriter`] struct-level doc comment for semantics of `start_offset`.
234 12744 : pub fn new(
235 12744 : file: W,
236 12744 : start_offset: u64,
237 12744 : gate: &utils::sync::gate::Gate,
238 12744 : cancel: CancellationToken,
239 12744 : ctx: &RequestContext,
240 12744 : flush_task_span: tracing::Span,
241 12744 : ) -> anyhow::Result<Self> {
242 12744 : Ok(Self {
243 12744 : io_buf: Some(BytesMut::new()),
244 12744 : writer: BufferedWriter::new(
245 12744 : file,
246 12744 : start_offset,
247 25488 : || IoBufferMut::with_capacity(Self::CAPACITY),
248 12744 : gate.enter()?,
249 12744 : cancel,
250 12744 : ctx,
251 12744 : flush_task_span,
252 12744 : ),
253 12744 : offset: start_offset,
254 : })
255 12744 : }
256 :
257 12299256 : pub fn size(&self) -> u64 {
258 12299256 : self.offset
259 12299256 : }
260 :
261 : const CAPACITY: usize = 64 * 1024;
262 :
263 : /// Writes `src_buf` to the file at the current offset.
264 78570864 : async fn write_all<Buf: IoBuf + Send>(
265 78570864 : &mut self,
266 78570864 : src_buf: FullSlice<Buf>,
267 78570864 : ctx: &RequestContext,
268 78570864 : ) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
269 78570864 : let res = self
270 78570864 : .writer
271 78570864 : // TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
272 78570864 : // Can remove all the complexity around owned buffers upstack
273 78570864 : .write_buffered_borrowed(&src_buf, ctx)
274 78570864 : .await
275 78570864 : .map(|len| {
276 78570864 : self.offset += len as u64;
277 78570864 : });
278 78570864 :
279 78570864 : (src_buf, res)
280 78570864 : }
281 :
282 : /// Write a blob of data. Returns the offset that it was written to,
283 : /// which can be used to retrieve the data later.
284 37104 : pub async fn write_blob<Buf: IoBuf + Send>(
285 37104 : &mut self,
286 37104 : srcbuf: FullSlice<Buf>,
287 37104 : ctx: &RequestContext,
288 37104 : ) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
289 37104 : let (buf, res) = self
290 37104 : .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
291 37104 : .await;
292 37104 : (buf, res.map(|(off, _compression_info)| off))
293 37104 : }
294 :
295 : /// Write a blob of data. Returns the offset that it was written to,
296 : /// which can be used to retrieve the data later.
297 39236280 : pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
298 39236280 : &mut self,
299 39236280 : srcbuf: FullSlice<Buf>,
300 39236280 : ctx: &RequestContext,
301 39236280 : algorithm: ImageCompressionAlgorithm,
302 39236280 : ) -> (
303 39236280 : FullSlice<Buf>,
304 39236280 : Result<(u64, CompressionInfo), WriteBlobError>,
305 39236280 : ) {
306 39236280 : let offset = self.offset;
307 39236280 : let mut compression_info = CompressionInfo {
308 39236280 : written_compressed: false,
309 39236280 : compressed_size: None,
310 39236280 : };
311 39236280 :
312 39236280 : let len = srcbuf.len();
313 39236280 :
314 39236280 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
315 39236280 : io_buf.clear();
316 39236280 : let mut compressed_buf = None;
317 39236280 : let ((io_buf_slice, hdr_res), srcbuf) = async {
318 39236280 : if len < 128 {
319 : // Short blob. Write a 1-byte length header
320 39036696 : io_buf.put_u8(len as u8);
321 39036696 : let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
322 39036696 : let res = res.map_err(WriteBlobError::Flush);
323 39036696 : ((slice, res), srcbuf)
324 : } else {
325 : // Write a 4-byte length header
326 199584 : if len > MAX_SUPPORTED_BLOB_LEN {
327 0 : return (
328 0 : (
329 0 : io_buf.slice_len(),
330 0 : Err(WriteBlobError::BlobTooLarge { len }),
331 0 : ),
332 0 : srcbuf,
333 0 : );
334 199584 : }
335 199584 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
336 60276 : ImageCompressionAlgorithm::Zstd { level } => {
337 60276 : let mut encoder = if let Some(level) = level {
338 60276 : async_compression::tokio::write::ZstdEncoder::with_quality(
339 60276 : Vec::new(),
340 60276 : Level::Precise(level.into()),
341 60276 : )
342 : } else {
343 0 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
344 : };
345 60276 : encoder.write_all(&srcbuf[..]).await.unwrap();
346 60276 : encoder.shutdown().await.unwrap();
347 60276 : let compressed = encoder.into_inner();
348 60276 : compression_info.compressed_size = Some(compressed.len());
349 60276 : if compressed.len() < len {
350 24 : compression_info.written_compressed = true;
351 24 : let compressed_len = compressed.len();
352 24 : compressed_buf = Some(compressed);
353 24 : (BYTE_ZSTD, compressed_len, srcbuf)
354 : } else {
355 60252 : (BYTE_UNCOMPRESSED, len, srcbuf)
356 : }
357 : }
358 139308 : ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
359 : };
360 199584 : let mut len_buf = (len_written as u32).to_be_bytes();
361 199584 : assert_eq!(len_buf[0] & 0xf0, 0);
362 199584 : len_buf[0] |= high_bit_mask;
363 199584 : io_buf.extend_from_slice(&len_buf[..]);
364 199584 : let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
365 199584 : let res = res.map_err(WriteBlobError::Flush);
366 199584 : ((slice, res), srcbuf)
367 : }
368 39236280 : }
369 39236280 : .await;
370 39236280 : self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
371 39236280 : match hdr_res {
372 39236280 : Ok(_) => (),
373 0 : Err(e) => return (srcbuf, Err(e)),
374 : }
375 39236280 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
376 24 : let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
377 24 : (srcbuf, res)
378 : } else {
379 39236256 : self.write_all(srcbuf, ctx).await
380 : };
381 39236280 : let res = res.map_err(WriteBlobError::Flush);
382 39236280 : (srcbuf, res.map(|_| (offset, compression_info)))
383 39236280 : }
384 :
385 : /// Writes a raw blob containing both header and data, returning its offset.
386 98304 : pub(crate) async fn write_blob_raw<Buf: IoBuf + Send>(
387 98304 : &mut self,
388 98304 : raw_with_header: FullSlice<Buf>,
389 98304 : ctx: &RequestContext,
390 98304 : ) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
391 : // Verify the header, to ensure we don't write invalid/corrupt data.
392 98304 : let header = match Header::decode(&raw_with_header)
393 98304 : .context("decoding blob header")
394 98304 : .map_err(WriteBlobError::WriteBlobRaw)
395 : {
396 98304 : Ok(header) => header,
397 0 : Err(err) => return (raw_with_header, Err(err)),
398 : };
399 98304 : if raw_with_header.len() != header.total_len() {
400 0 : let header_total_len = header.total_len();
401 0 : let raw_len = raw_with_header.len();
402 0 : return (
403 0 : raw_with_header,
404 0 : Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
405 0 : "header length mismatch: {header_total_len} != {raw_len}"
406 0 : ))),
407 0 : );
408 98304 : }
409 98304 :
410 98304 : let offset = self.offset;
411 98304 : let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
412 98304 : let result = result.map_err(WriteBlobError::Flush);
413 98304 : (raw_with_header, result.map(|_| offset))
414 98304 : }
415 :
416 : /// Finish this blob writer and return the underlying `W`.
417 11124 : pub async fn shutdown(
418 11124 : self,
419 11124 : mode: BufferedWriterShutdownMode,
420 11124 : ctx: &RequestContext,
421 11124 : ) -> Result<W, FlushTaskError> {
422 11124 : let (_, file) = self.writer.shutdown(mode, ctx).await?;
423 11124 : Ok(file)
424 11124 : }
425 : }
426 :
427 : #[cfg(test)]
428 : pub(crate) mod tests {
429 : use camino::Utf8PathBuf;
430 : use camino_tempfile::Utf8TempDir;
431 : use rand::{Rng, SeedableRng};
432 : use tracing::info_span;
433 :
434 : use super::*;
435 : use crate::context::DownloadBehavior;
436 : use crate::task_mgr::TaskKind;
437 : use crate::tenant::block_io::BlockReaderRef;
438 : use crate::virtual_file;
439 : use crate::virtual_file::TempVirtualFile;
440 : use crate::virtual_file::VirtualFile;
441 :
442 72 : async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
443 72 : round_trip_test_compressed(blobs, false).await
444 72 : }
445 :
446 144 : pub(crate) async fn write_maybe_compressed(
447 144 : blobs: &[Vec<u8>],
448 144 : compression: bool,
449 144 : ctx: &RequestContext,
450 144 : ) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
451 144 : let temp_dir = camino_tempfile::tempdir()?;
452 144 : let pathbuf = temp_dir.path().join("file");
453 144 : let gate = utils::sync::gate::Gate::default();
454 144 : let cancel = CancellationToken::new();
455 144 :
456 144 : // Write part (in block to drop the file)
457 144 : let mut offsets = Vec::new();
458 : {
459 144 : let file = TempVirtualFile::new(
460 144 : VirtualFile::open_with_options_v2(
461 144 : pathbuf.as_path(),
462 144 : virtual_file::OpenOptions::new()
463 144 : .create_new(true)
464 144 : .write(true),
465 144 : ctx,
466 144 : )
467 144 : .await?,
468 144 : gate.enter()?,
469 : );
470 144 : let mut wtr =
471 144 : BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
472 49584 : for blob in blobs.iter() {
473 49584 : let (_, res) = if compression {
474 12480 : let res = wtr
475 12480 : .write_blob_maybe_compressed(
476 12480 : blob.clone().slice_len(),
477 12480 : ctx,
478 12480 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
479 12480 : )
480 12480 : .await;
481 12480 : (res.0, res.1.map(|(off, _)| off))
482 : } else {
483 37104 : wtr.write_blob(blob.clone().slice_len(), ctx).await
484 : };
485 49584 : let offs = res?;
486 49584 : offsets.push(offs);
487 : }
488 144 : let file = wtr
489 144 : .shutdown(
490 144 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
491 144 : ctx,
492 144 : )
493 144 : .await?;
494 144 : file.disarm_into_inner()
495 144 : };
496 144 : Ok((temp_dir, pathbuf, offsets))
497 144 : }
498 :
499 96 : async fn round_trip_test_compressed(
500 96 : blobs: &[Vec<u8>],
501 96 : compression: bool,
502 96 : ) -> anyhow::Result<()> {
503 96 : let ctx =
504 96 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
505 96 : let (_temp_dir, pathbuf, offsets) =
506 96 : write_maybe_compressed(blobs, compression, &ctx).await?;
507 :
508 96 : println!("Done writing!");
509 96 : let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
510 96 : let rdr = BlockReaderRef::VirtualFile(&file);
511 96 : let rdr = BlockCursor::new_with_compression(rdr, compression);
512 24864 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
513 24864 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
514 24864 : assert_eq!(
515 24864 : blob, &blob_read,
516 0 : "mismatch for idx={idx} at offset={offset}"
517 : );
518 : }
519 96 : Ok(())
520 96 : }
521 :
522 36948 : pub(crate) fn random_array(len: usize) -> Vec<u8> {
523 36948 : let mut rng = rand::thread_rng();
524 408911424 : (0..len).map(|_| rng.r#gen()).collect::<_>()
525 36948 : }
526 :
527 : #[tokio::test]
528 12 : async fn test_one() -> anyhow::Result<()> {
529 12 : let blobs = &[vec![12, 21, 22]];
530 12 : round_trip_test(blobs).await?;
531 12 : Ok(())
532 12 : }
533 :
534 : #[tokio::test]
535 12 : async fn test_hello_simple() -> anyhow::Result<()> {
536 12 : let blobs = &[
537 12 : vec![0, 1, 2, 3],
538 12 : b"Hello, World!".to_vec(),
539 12 : Vec::new(),
540 12 : b"foobar".to_vec(),
541 12 : ];
542 12 : round_trip_test(blobs).await?;
543 12 : round_trip_test_compressed(blobs, true).await?;
544 12 : Ok(())
545 12 : }
546 :
547 : #[tokio::test]
548 12 : async fn test_really_big_array() -> anyhow::Result<()> {
549 12 : let blobs = &[
550 12 : b"test".to_vec(),
551 12 : random_array(10 * PAGE_SZ),
552 12 : b"hello".to_vec(),
553 12 : random_array(66 * PAGE_SZ),
554 12 : vec![0xf3; 24 * PAGE_SZ],
555 12 : b"foobar".to_vec(),
556 12 : ];
557 12 : round_trip_test(blobs).await?;
558 12 : round_trip_test_compressed(blobs, true).await?;
559 12 : Ok(())
560 12 : }
561 :
562 : #[tokio::test]
563 12 : async fn test_arrays_inc() -> anyhow::Result<()> {
564 12 : let blobs = (0..PAGE_SZ / 8)
565 12288 : .map(|v| random_array(v * 16))
566 12 : .collect::<Vec<_>>();
567 12 : round_trip_test(&blobs).await?;
568 12 : Ok(())
569 12 : }
570 :
571 : #[tokio::test]
572 12 : async fn test_arrays_random_size() -> anyhow::Result<()> {
573 12 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
574 12 : let blobs = (0..1024)
575 12288 : .map(|_| {
576 12288 : let mut sz: u16 = rng.r#gen();
577 12288 : // Make 50% of the arrays small
578 12288 : if rng.r#gen() {
579 6192 : sz &= 63;
580 6192 : }
581 12288 : random_array(sz.into())
582 12288 : })
583 12 : .collect::<Vec<_>>();
584 12 : round_trip_test(&blobs).await?;
585 12 : Ok(())
586 12 : }
587 :
588 : #[tokio::test]
589 12 : async fn test_arrays_page_boundary() -> anyhow::Result<()> {
590 12 : let blobs = &[
591 12 : random_array(PAGE_SZ - 4),
592 12 : random_array(PAGE_SZ - 4),
593 12 : random_array(PAGE_SZ - 4),
594 12 : ];
595 12 : round_trip_test(blobs).await?;
596 12 : Ok(())
597 12 : }
598 : }
|