Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use std::cmp::min;
18 :
19 : use anyhow::Context;
20 : use async_compression::Level;
21 : use bytes::{BufMut, BytesMut};
22 : use pageserver_api::models::ImageCompressionAlgorithm;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::IoBuf;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::warn;
27 :
28 : use crate::context::RequestContext;
29 : use crate::page_cache::PAGE_SZ;
30 : use crate::tenant::block_io::BlockCursor;
31 : use crate::virtual_file::IoBufferMut;
32 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
33 : use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
34 : use crate::virtual_file::owned_buffers_io::write::{BufferedWriterShutdownMode, OwnedAsyncWriter};
35 :
36 : #[derive(Copy, Clone, Debug)]
37 : pub struct CompressionInfo {
38 : pub written_compressed: bool,
39 : pub compressed_size: Option<usize>,
40 : }
41 :
42 : /// A blob header, with header+data length and compression info.
43 : ///
44 : /// TODO: use this more widely, and add an encode() method too.
45 : /// TODO: document the header format.
46 : #[derive(Clone, Copy, Default)]
47 : pub struct Header {
48 : pub header_len: usize,
49 : pub data_len: usize,
50 : pub compression_bits: u8,
51 : }
52 :
53 : impl Header {
54 : /// Decodes a header from a byte slice.
55 1954115 : pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
56 1954115 : let Some(&first_header_byte) = bytes.first() else {
57 0 : anyhow::bail!("zero-length blob header");
58 : };
59 :
60 : // If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
61 1954115 : if first_header_byte < 0x80 {
62 1933233 : return Ok(Self {
63 1933233 : header_len: 1, // by definition
64 1933233 : data_len: first_header_byte as usize,
65 1933233 : compression_bits: BYTE_UNCOMPRESSED,
66 1933233 : });
67 20882 : }
68 :
69 : // Otherwise, this is a 4-byte header containing compression information and length.
70 : const HEADER_LEN: usize = 4;
71 20882 : let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN]
72 20882 : .try_into()
73 20882 : .map_err(|_| anyhow::anyhow!("blob header too short: {bytes:?}"))?;
74 :
75 : // TODO: verify the compression bits and convert to an enum.
76 20882 : let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
77 20882 : header_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
78 20882 : let data_len = u32::from_be_bytes(header_buf) as usize;
79 :
80 20882 : Ok(Self {
81 20882 : header_len: HEADER_LEN,
82 20882 : data_len,
83 20882 : compression_bits,
84 20882 : })
85 1954115 : }
86 :
87 : /// Returns the total header+data length.
88 10248 : pub fn total_len(&self) -> usize {
89 10248 : self.header_len + self.data_len
90 10248 : }
91 : }
92 :
93 : #[derive(Debug, thiserror::Error)]
94 : pub enum WriteBlobError {
95 : #[error(transparent)]
96 : Flush(FlushTaskError),
97 : #[error(transparent)]
98 : Other(anyhow::Error),
99 : }
100 :
101 : impl WriteBlobError {
102 0 : pub fn is_cancel(&self) -> bool {
103 0 : match self {
104 0 : WriteBlobError::Flush(e) => e.is_cancel(),
105 0 : WriteBlobError::Other(_) => false,
106 : }
107 0 : }
108 0 : pub fn into_anyhow(self) -> anyhow::Error {
109 0 : match self {
110 0 : WriteBlobError::Flush(e) => e.into_anyhow(),
111 0 : WriteBlobError::Other(e) => e,
112 : }
113 0 : }
114 : }
115 :
116 : impl BlockCursor<'_> {
117 : /// Read a blob into a new buffer.
118 2076 : pub async fn read_blob(
119 2076 : &self,
120 2076 : offset: u64,
121 2076 : ctx: &RequestContext,
122 2076 : ) -> Result<Vec<u8>, std::io::Error> {
123 2076 : let mut buf = Vec::new();
124 2076 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
125 2076 : Ok(buf)
126 2076 : }
127 : /// Read blob into the given buffer. Any previous contents in the buffer
128 : /// are overwritten.
129 2108 : pub async fn read_blob_into_buf(
130 2108 : &self,
131 2108 : offset: u64,
132 2108 : dstbuf: &mut Vec<u8>,
133 2108 : ctx: &RequestContext,
134 2108 : ) -> Result<(), std::io::Error> {
135 2108 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
136 2108 : let mut off = (offset % PAGE_SZ as u64) as usize;
137 :
138 2108 : let mut buf = self.read_blk(blknum, ctx).await?;
139 :
140 : // peek at the first byte, to determine if it's a 1- or 4-byte length
141 2108 : let first_len_byte = buf[off];
142 2108 : let len: usize = if first_len_byte < 0x80 {
143 : // 1-byte length header
144 572 : off += 1;
145 572 : first_len_byte as usize
146 : } else {
147 : // 4-byte length header
148 1536 : let mut len_buf = [0u8; 4];
149 1536 : let thislen = PAGE_SZ - off;
150 1536 : if thislen < 4 {
151 : // it is split across two pages
152 0 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
153 0 : blknum += 1;
154 0 : buf = self.read_blk(blknum, ctx).await?;
155 0 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
156 0 : off = 4 - thislen;
157 1536 : } else {
158 1536 : len_buf.copy_from_slice(&buf[off..off + 4]);
159 1536 : off += 4;
160 1536 : }
161 1536 : let bit_mask = if self.read_compressed {
162 7 : !LEN_COMPRESSION_BIT_MASK
163 : } else {
164 1529 : 0x7f
165 : };
166 1536 : len_buf[0] &= bit_mask;
167 1536 : u32::from_be_bytes(len_buf) as usize
168 : };
169 2108 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
170 :
171 2108 : let mut tmp_buf = Vec::new();
172 : let buf_to_write;
173 2108 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
174 2107 : if compression_bits > BYTE_UNCOMPRESSED {
175 0 : warn!("reading key above future limit ({len} bytes)");
176 2107 : }
177 2107 : buf_to_write = dstbuf;
178 2107 : None
179 1 : } else if compression_bits == BYTE_ZSTD {
180 1 : buf_to_write = &mut tmp_buf;
181 1 : Some(dstbuf)
182 : } else {
183 0 : let error = std::io::Error::new(
184 0 : std::io::ErrorKind::InvalidData,
185 0 : format!("invalid compression byte {compression_bits:x}"),
186 : );
187 0 : return Err(error);
188 : };
189 :
190 2108 : buf_to_write.clear();
191 2108 : buf_to_write.reserve(len);
192 :
193 : // Read the payload
194 2108 : let mut remain = len;
195 7427 : while remain > 0 {
196 5319 : let mut page_remain = PAGE_SZ - off;
197 5319 : if page_remain == 0 {
198 : // continue on next page
199 3221 : blknum += 1;
200 3221 : buf = self.read_blk(blknum, ctx).await?;
201 3221 : off = 0;
202 3221 : page_remain = PAGE_SZ;
203 2098 : }
204 5319 : let this_blk_len = min(remain, page_remain);
205 5319 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
206 5319 : remain -= this_blk_len;
207 5319 : off += this_blk_len;
208 : }
209 :
210 2108 : if let Some(dstbuf) = compression {
211 1 : if compression_bits == BYTE_ZSTD {
212 1 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
213 1 : decoder.write_all(buf_to_write).await?;
214 1 : decoder.flush().await?;
215 : } else {
216 0 : unreachable!("already checked above")
217 : }
218 2107 : }
219 :
220 2108 : Ok(())
221 2108 : }
222 : }
223 :
224 : /// Reserved bits for length and compression
225 : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
226 :
227 : /// The maximum size of blobs we support. The highest few bits
228 : /// are reserved for compression and other further uses.
229 : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
230 :
231 : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
232 : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
233 :
234 : /// A wrapper of `VirtualFile` that allows users to write blobs.
235 : pub struct BlobWriter<W> {
236 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
237 : io_buf: Option<BytesMut>,
238 : writer: BufferedWriter<IoBufferMut, W>,
239 : offset: u64,
240 : }
241 :
242 : impl<W> BlobWriter<W>
243 : where
244 : W: OwnedAsyncWriter + std::fmt::Debug + Send + Sync + 'static,
245 : {
246 : /// See [`BufferedWriter`] struct-level doc comment for semantics of `start_offset`.
247 1129 : pub fn new(
248 1129 : file: W,
249 1129 : start_offset: u64,
250 1129 : gate: &utils::sync::gate::Gate,
251 1129 : cancel: CancellationToken,
252 1129 : ctx: &RequestContext,
253 1129 : flush_task_span: tracing::Span,
254 1129 : ) -> anyhow::Result<Self> {
255 : Ok(Self {
256 1129 : io_buf: Some(BytesMut::new()),
257 1129 : writer: BufferedWriter::new(
258 1129 : file,
259 1129 : start_offset,
260 2258 : || IoBufferMut::with_capacity(Self::CAPACITY),
261 1129 : gate.enter()?,
262 1129 : cancel,
263 1129 : ctx,
264 1129 : flush_task_span,
265 : ),
266 1129 : offset: start_offset,
267 : })
268 1129 : }
269 :
270 1030649 : pub fn size(&self) -> u64 {
271 1030649 : self.offset
272 1030649 : }
273 :
274 : const CAPACITY: usize = 64 * 1024;
275 :
276 : /// Writes `src_buf` to the file at the current offset.
277 6547618 : async fn write_all<Buf: IoBuf + Send>(
278 6547618 : &mut self,
279 6547618 : src_buf: FullSlice<Buf>,
280 6547618 : ctx: &RequestContext,
281 6547618 : ) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
282 6547618 : let res = self
283 6547618 : .writer
284 6547618 : // TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
285 6547618 : // Can remove all the complexity around owned buffers upstack
286 6547618 : .write_buffered_borrowed(&src_buf, ctx)
287 6547618 : .await
288 6547618 : .map(|len| {
289 6547618 : self.offset += len as u64;
290 6547618 : });
291 :
292 6547618 : (src_buf, res)
293 6547618 : }
294 :
295 : /// Write a blob of data. Returns the offset that it was written to,
296 : /// which can be used to retrieve the data later.
297 3092 : pub async fn write_blob<Buf: IoBuf + Send>(
298 3092 : &mut self,
299 3092 : srcbuf: FullSlice<Buf>,
300 3092 : ctx: &RequestContext,
301 3092 : ) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
302 3092 : let (buf, res) = self
303 3092 : .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
304 3092 : .await;
305 3092 : (buf, res.map(|(off, _compression_info)| off))
306 3092 : }
307 :
308 : /// Write a blob of data. Returns the offset that it was written to,
309 : /// which can be used to retrieve the data later.
310 3269713 : pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
311 3269713 : &mut self,
312 3269713 : srcbuf: FullSlice<Buf>,
313 3269713 : ctx: &RequestContext,
314 3269713 : algorithm: ImageCompressionAlgorithm,
315 3269713 : ) -> (
316 3269713 : FullSlice<Buf>,
317 3269713 : Result<(u64, CompressionInfo), WriteBlobError>,
318 3269713 : ) {
319 3269713 : let offset = self.offset;
320 3269713 : let mut compression_info = CompressionInfo {
321 3269713 : written_compressed: false,
322 3269713 : compressed_size: None,
323 3269713 : };
324 :
325 3269713 : let len = srcbuf.len();
326 :
327 3269713 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
328 3269713 : io_buf.clear();
329 3269713 : let mut compressed_buf = None;
330 3269713 : let ((io_buf_slice, hdr_res), srcbuf) = async {
331 3269713 : if len < 128 {
332 : // Short blob. Write a 1-byte length header
333 3253081 : io_buf.put_u8(len as u8);
334 3253081 : let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
335 3253081 : let res = res.map_err(WriteBlobError::Flush);
336 3253081 : ((slice, res), srcbuf)
337 : } else {
338 : // Write a 4-byte length header
339 16632 : if len > MAX_SUPPORTED_BLOB_LEN {
340 0 : return (
341 0 : (
342 0 : io_buf.slice_len(),
343 0 : Err(WriteBlobError::Other(anyhow::anyhow!(
344 0 : "blob too large ({len} bytes)"
345 0 : ))),
346 0 : ),
347 0 : srcbuf,
348 0 : );
349 16632 : }
350 16632 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
351 5023 : ImageCompressionAlgorithm::Zstd { level } => {
352 5023 : let mut encoder = if let Some(level) = level {
353 5023 : async_compression::tokio::write::ZstdEncoder::with_quality(
354 5023 : Vec::new(),
355 5023 : Level::Precise(level.into()),
356 : )
357 : } else {
358 0 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
359 : };
360 5023 : encoder.write_all(&srcbuf[..]).await.unwrap();
361 5023 : encoder.shutdown().await.unwrap();
362 5023 : let compressed = encoder.into_inner();
363 5023 : compression_info.compressed_size = Some(compressed.len());
364 5023 : if compressed.len() < len {
365 2 : compression_info.written_compressed = true;
366 2 : let compressed_len = compressed.len();
367 2 : compressed_buf = Some(compressed);
368 2 : (BYTE_ZSTD, compressed_len, srcbuf)
369 : } else {
370 5021 : (BYTE_UNCOMPRESSED, len, srcbuf)
371 : }
372 : }
373 11609 : ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
374 : };
375 16632 : let mut len_buf = (len_written as u32).to_be_bytes();
376 16632 : assert_eq!(len_buf[0] & 0xf0, 0);
377 16632 : len_buf[0] |= high_bit_mask;
378 16632 : io_buf.extend_from_slice(&len_buf[..]);
379 16632 : let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
380 16632 : let res = res.map_err(WriteBlobError::Flush);
381 16632 : ((slice, res), srcbuf)
382 : }
383 3269713 : }
384 3269713 : .await;
385 3269713 : self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
386 3269713 : match hdr_res {
387 3269713 : Ok(_) => (),
388 0 : Err(e) => return (srcbuf, Err(e)),
389 : }
390 3269713 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
391 2 : let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
392 2 : (srcbuf, res)
393 : } else {
394 3269711 : self.write_all(srcbuf, ctx).await
395 : };
396 3269713 : let res = res.map_err(WriteBlobError::Flush);
397 3269713 : (srcbuf, res.map(|_| (offset, compression_info)))
398 3269713 : }
399 :
400 : /// Writes a raw blob containing both header and data, returning its offset.
401 8192 : pub(crate) async fn write_blob_raw<Buf: IoBuf + Send>(
402 8192 : &mut self,
403 8192 : raw_with_header: FullSlice<Buf>,
404 8192 : ctx: &RequestContext,
405 8192 : ) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
406 : // Verify the header, to ensure we don't write invalid/corrupt data.
407 8192 : let header = match Header::decode(&raw_with_header)
408 8192 : .context("decoding blob header")
409 8192 : .map_err(WriteBlobError::Other)
410 : {
411 8192 : Ok(header) => header,
412 0 : Err(err) => return (raw_with_header, Err(err)),
413 : };
414 8192 : if raw_with_header.len() != header.total_len() {
415 0 : let header_total_len = header.total_len();
416 0 : let raw_len = raw_with_header.len();
417 0 : return (
418 0 : raw_with_header,
419 0 : Err(WriteBlobError::Other(anyhow::anyhow!(
420 0 : "header length mismatch: {header_total_len} != {raw_len}"
421 0 : ))),
422 0 : );
423 8192 : }
424 :
425 8192 : let offset = self.offset;
426 8192 : let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
427 8192 : let result = result.map_err(WriteBlobError::Flush);
428 8192 : (raw_with_header, result.map(|_| offset))
429 8192 : }
430 :
431 : /// Finish this blob writer and return the underlying `W`.
432 941 : pub async fn shutdown(
433 941 : self,
434 941 : mode: BufferedWriterShutdownMode,
435 941 : ctx: &RequestContext,
436 941 : ) -> Result<W, FlushTaskError> {
437 941 : let (_, file) = self.writer.shutdown(mode, ctx).await?;
438 941 : Ok(file)
439 941 : }
440 : }
441 :
442 : #[cfg(test)]
443 : pub(crate) mod tests {
444 : use camino::Utf8PathBuf;
445 : use camino_tempfile::Utf8TempDir;
446 : use rand::{Rng, SeedableRng};
447 : use tracing::info_span;
448 :
449 : use super::*;
450 : use crate::context::DownloadBehavior;
451 : use crate::task_mgr::TaskKind;
452 : use crate::tenant::block_io::BlockReaderRef;
453 : use crate::virtual_file;
454 : use crate::virtual_file::TempVirtualFile;
455 : use crate::virtual_file::VirtualFile;
456 :
457 6 : async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
458 6 : round_trip_test_compressed(blobs, false).await
459 6 : }
460 :
461 12 : pub(crate) async fn write_maybe_compressed(
462 12 : blobs: &[Vec<u8>],
463 12 : compression: bool,
464 12 : ctx: &RequestContext,
465 12 : ) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
466 12 : let temp_dir = camino_tempfile::tempdir()?;
467 12 : let pathbuf = temp_dir.path().join("file");
468 12 : let gate = utils::sync::gate::Gate::default();
469 12 : let cancel = CancellationToken::new();
470 :
471 : // Write part (in block to drop the file)
472 12 : let mut offsets = Vec::new();
473 : {
474 12 : let file = TempVirtualFile::new(
475 12 : VirtualFile::open_with_options_v2(
476 12 : pathbuf.as_path(),
477 12 : virtual_file::OpenOptions::new()
478 12 : .create_new(true)
479 12 : .write(true),
480 12 : ctx,
481 12 : )
482 12 : .await?,
483 12 : gate.enter()?,
484 : );
485 12 : let mut wtr =
486 12 : BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
487 4132 : for blob in blobs.iter() {
488 4132 : let (_, res) = if compression {
489 1040 : let res = wtr
490 1040 : .write_blob_maybe_compressed(
491 1040 : blob.clone().slice_len(),
492 1040 : ctx,
493 1040 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
494 1040 : )
495 1040 : .await;
496 1040 : (res.0, res.1.map(|(off, _)| off))
497 : } else {
498 3092 : wtr.write_blob(blob.clone().slice_len(), ctx).await
499 : };
500 4132 : let offs = res?;
501 4132 : offsets.push(offs);
502 : }
503 12 : let file = wtr
504 12 : .shutdown(
505 12 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
506 12 : ctx,
507 12 : )
508 12 : .await?;
509 12 : file.disarm_into_inner()
510 : };
511 12 : Ok((temp_dir, pathbuf, offsets))
512 12 : }
513 :
514 8 : async fn round_trip_test_compressed(
515 8 : blobs: &[Vec<u8>],
516 8 : compression: bool,
517 8 : ) -> anyhow::Result<()> {
518 8 : let ctx =
519 8 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
520 8 : let (_temp_dir, pathbuf, offsets) =
521 8 : write_maybe_compressed(blobs, compression, &ctx).await?;
522 :
523 8 : println!("Done writing!");
524 8 : let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
525 8 : let rdr = BlockReaderRef::VirtualFile(&file);
526 8 : let rdr = BlockCursor::new_with_compression(rdr, compression);
527 2072 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
528 2072 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
529 2072 : assert_eq!(
530 2072 : blob, &blob_read,
531 0 : "mismatch for idx={idx} at offset={offset}"
532 : );
533 : }
534 8 : Ok(())
535 8 : }
536 :
537 3079 : pub(crate) fn random_array(len: usize) -> Vec<u8> {
538 3079 : let mut rng = rand::thread_rng();
539 34075952 : (0..len).map(|_| rng.r#gen()).collect::<_>()
540 3079 : }
541 :
542 : #[tokio::test]
543 1 : async fn test_one() -> anyhow::Result<()> {
544 1 : let blobs = &[vec![12, 21, 22]];
545 1 : round_trip_test(blobs).await?;
546 2 : Ok(())
547 1 : }
548 :
549 : #[tokio::test]
550 1 : async fn test_hello_simple() -> anyhow::Result<()> {
551 1 : let blobs = &[
552 1 : vec![0, 1, 2, 3],
553 1 : b"Hello, World!".to_vec(),
554 1 : Vec::new(),
555 1 : b"foobar".to_vec(),
556 1 : ];
557 1 : round_trip_test(blobs).await?;
558 1 : round_trip_test_compressed(blobs, true).await?;
559 2 : Ok(())
560 1 : }
561 :
562 : #[tokio::test]
563 1 : async fn test_really_big_array() -> anyhow::Result<()> {
564 1 : let blobs = &[
565 1 : b"test".to_vec(),
566 1 : random_array(10 * PAGE_SZ),
567 1 : b"hello".to_vec(),
568 1 : random_array(66 * PAGE_SZ),
569 1 : vec![0xf3; 24 * PAGE_SZ],
570 1 : b"foobar".to_vec(),
571 1 : ];
572 1 : round_trip_test(blobs).await?;
573 1 : round_trip_test_compressed(blobs, true).await?;
574 2 : Ok(())
575 1 : }
576 :
577 : #[tokio::test]
578 1 : async fn test_arrays_inc() -> anyhow::Result<()> {
579 1 : let blobs = (0..PAGE_SZ / 8)
580 1024 : .map(|v| random_array(v * 16))
581 1 : .collect::<Vec<_>>();
582 1 : round_trip_test(&blobs).await?;
583 2 : Ok(())
584 1 : }
585 :
586 : #[tokio::test]
587 1 : async fn test_arrays_random_size() -> anyhow::Result<()> {
588 1 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
589 1 : let blobs = (0..1024)
590 1024 : .map(|_| {
591 1024 : let mut sz: u16 = rng.r#gen();
592 : // Make 50% of the arrays small
593 1024 : if rng.r#gen() {
594 516 : sz &= 63;
595 516 : }
596 1024 : random_array(sz.into())
597 1024 : })
598 1 : .collect::<Vec<_>>();
599 1 : round_trip_test(&blobs).await?;
600 2 : Ok(())
601 1 : }
602 :
603 : #[tokio::test]
604 1 : async fn test_arrays_page_boundary() -> anyhow::Result<()> {
605 1 : let blobs = &[
606 1 : random_array(PAGE_SZ - 4),
607 1 : random_array(PAGE_SZ - 4),
608 1 : random_array(PAGE_SZ - 4),
609 1 : ];
610 1 : round_trip_test(blobs).await?;
611 2 : Ok(())
612 1 : }
613 : }
|