Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use async_compression::Level;
18 : use bytes::{BufMut, BytesMut};
19 : use pageserver_api::models::ImageCompressionAlgorithm;
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
22 : use tracing::warn;
23 :
24 : use crate::context::RequestContext;
25 : use crate::page_cache::PAGE_SZ;
26 : use crate::tenant::block_io::BlockCursor;
27 : use crate::virtual_file::VirtualFile;
28 : use std::cmp::min;
29 : use std::io::{Error, ErrorKind};
30 :
31 : #[derive(Copy, Clone, Debug)]
32 : pub struct CompressionInfo {
33 : pub written_compressed: bool,
34 : pub compressed_size: Option<usize>,
35 : }
36 :
37 : impl<'a> BlockCursor<'a> {
38 : /// Read a blob into a new buffer.
39 2591663 : pub async fn read_blob(
40 2591663 : &self,
41 2591663 : offset: u64,
42 2591663 : ctx: &RequestContext,
43 2591663 : ) -> Result<Vec<u8>, std::io::Error> {
44 2591663 : let mut buf = Vec::new();
45 2591663 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
46 2591663 : Ok(buf)
47 2591663 : }
48 : /// Read blob into the given buffer. Any previous contents in the buffer
49 : /// are overwritten.
50 6977565 : pub async fn read_blob_into_buf(
51 6977565 : &self,
52 6977565 : offset: u64,
53 6977565 : dstbuf: &mut Vec<u8>,
54 6977565 : ctx: &RequestContext,
55 6977565 : ) -> Result<(), std::io::Error> {
56 6977565 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
57 6977565 : let mut off = (offset % PAGE_SZ as u64) as usize;
58 :
59 6977565 : let mut buf = self.read_blk(blknum, ctx).await?;
60 :
61 : // peek at the first byte, to determine if it's a 1- or 4-byte length
62 6977565 : let first_len_byte = buf[off];
63 6977565 : let len: usize = if first_len_byte < 0x80 {
64 : // 1-byte length header
65 6962745 : off += 1;
66 6962745 : first_len_byte as usize
67 : } else {
68 : // 4-byte length header
69 14820 : let mut len_buf = [0u8; 4];
70 14820 : let thislen = PAGE_SZ - off;
71 14820 : if thislen < 4 {
72 : // it is split across two pages
73 1 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
74 1 : blknum += 1;
75 1 : buf = self.read_blk(blknum, ctx).await?;
76 1 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
77 1 : off = 4 - thislen;
78 14819 : } else {
79 14819 : len_buf.copy_from_slice(&buf[off..off + 4]);
80 14819 : off += 4;
81 14819 : }
82 14820 : let bit_mask = if self.read_compressed {
83 20 : !LEN_COMPRESSION_BIT_MASK
84 : } else {
85 14800 : 0x7f
86 : };
87 14820 : len_buf[0] &= bit_mask;
88 14820 : u32::from_be_bytes(len_buf) as usize
89 : };
90 6977565 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
91 6977565 :
92 6977565 : let mut tmp_buf = Vec::new();
93 : let buf_to_write;
94 6977565 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
95 6977561 : if compression_bits > BYTE_UNCOMPRESSED {
96 0 : warn!("reading key above future limit ({len} bytes)");
97 6977561 : }
98 6977561 : buf_to_write = dstbuf;
99 6977561 : None
100 4 : } else if compression_bits == BYTE_ZSTD {
101 4 : buf_to_write = &mut tmp_buf;
102 4 : Some(dstbuf)
103 : } else {
104 0 : let error = std::io::Error::new(
105 0 : std::io::ErrorKind::InvalidData,
106 0 : format!("invalid compression byte {compression_bits:x}"),
107 0 : );
108 0 : return Err(error);
109 : };
110 :
111 6977565 : buf_to_write.clear();
112 6977565 : buf_to_write.reserve(len);
113 6977565 :
114 6977565 : // Read the payload
115 6977565 : let mut remain = len;
116 14037326 : while remain > 0 {
117 7059761 : let mut page_remain = PAGE_SZ - off;
118 7059761 : if page_remain == 0 {
119 : // continue on next page
120 82884 : blknum += 1;
121 82884 : buf = self.read_blk(blknum, ctx).await?;
122 82884 : off = 0;
123 82884 : page_remain = PAGE_SZ;
124 6976877 : }
125 7059761 : let this_blk_len = min(remain, page_remain);
126 7059761 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
127 7059761 : remain -= this_blk_len;
128 7059761 : off += this_blk_len;
129 : }
130 :
131 6977565 : if let Some(dstbuf) = compression {
132 4 : if compression_bits == BYTE_ZSTD {
133 4 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
134 4 : decoder.write_all(buf_to_write).await?;
135 4 : decoder.flush().await?;
136 : } else {
137 0 : unreachable!("already checked above")
138 : }
139 6977561 : }
140 :
141 6977565 : Ok(())
142 6977565 : }
143 : }
144 :
145 : /// Reserved bits for length and compression
146 : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
147 :
148 : /// The maximum size of blobs we support. The highest few bits
149 : /// are reserved for compression and other further uses.
150 : const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
151 :
152 : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
153 : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
154 :
155 : /// A wrapper of `VirtualFile` that allows users to write blobs.
156 : ///
157 : /// If a `BlobWriter` is dropped, the internal buffer will be
158 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
159 : /// manually before dropping.
160 : pub struct BlobWriter<const BUFFERED: bool> {
161 : inner: VirtualFile,
162 : offset: u64,
163 : /// A buffer to save on write calls, only used if BUFFERED=true
164 : buf: Vec<u8>,
165 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
166 : io_buf: Option<BytesMut>,
167 : }
168 :
169 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
170 1690 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
171 1690 : Self {
172 1690 : inner,
173 1690 : offset: start_offset,
174 1690 : buf: Vec::with_capacity(Self::CAPACITY),
175 1690 : io_buf: Some(BytesMut::new()),
176 1690 : }
177 1690 : }
178 :
179 2025870 : pub fn size(&self) -> u64 {
180 2025870 : self.offset
181 2025870 : }
182 :
183 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
184 :
185 : /// Writes the given buffer directly to the underlying `VirtualFile`.
186 : /// You need to make sure that the internal buffer is empty, otherwise
187 : /// data will be written in wrong order.
188 : #[inline(always)]
189 1084690 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
190 1084690 : &mut self,
191 1084690 : src_buf: B,
192 1084690 : ctx: &RequestContext,
193 1084690 : ) -> (B::Buf, Result<(), Error>) {
194 1084690 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
195 1084690 : let nbytes = match res {
196 1084690 : Ok(nbytes) => nbytes,
197 0 : Err(e) => return (src_buf, Err(e)),
198 : };
199 1084690 : self.offset += nbytes as u64;
200 1084690 : (src_buf, Ok(()))
201 1084690 : }
202 :
203 : #[inline(always)]
204 : /// Flushes the internal buffer to the underlying `VirtualFile`.
205 10740 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
206 10740 : let buf = std::mem::take(&mut self.buf);
207 10740 : let (mut buf, res) = self.inner.write_all(buf, ctx).await;
208 10740 : res?;
209 10740 : buf.clear();
210 10740 : self.buf = buf;
211 10740 : Ok(())
212 10740 : }
213 :
214 : #[inline(always)]
215 : /// Writes as much of `src_buf` into the internal buffer as it fits
216 12943388 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
217 12943388 : let remaining = Self::CAPACITY - self.buf.len();
218 12943388 : let to_copy = src_buf.len().min(remaining);
219 12943388 : self.buf.extend_from_slice(&src_buf[..to_copy]);
220 12943388 : self.offset += to_copy as u64;
221 12943388 : to_copy
222 12943388 : }
223 :
224 : /// Internal, possibly buffered, write function
225 14018820 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
226 14018820 : &mut self,
227 14018820 : src_buf: B,
228 14018820 : ctx: &RequestContext,
229 14018820 : ) -> (B::Buf, Result<(), Error>) {
230 14018820 : if !BUFFERED {
231 1084460 : assert!(self.buf.is_empty());
232 1084460 : return self.write_all_unbuffered(src_buf, ctx).await;
233 12934360 : }
234 12934360 : let remaining = Self::CAPACITY - self.buf.len();
235 12934360 : let src_buf_len = src_buf.bytes_init();
236 12934360 : if src_buf_len == 0 {
237 24 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
238 12934336 : }
239 12934336 : let mut src_buf = src_buf.slice(0..src_buf_len);
240 12934336 : // First try to copy as much as we can into the buffer
241 12934336 : if remaining > 0 {
242 12934336 : let copied = self.write_into_buffer(&src_buf);
243 12934336 : src_buf = src_buf.slice(copied..);
244 12934336 : }
245 : // Then, if the buffer is full, flush it out
246 12934336 : if self.buf.len() == Self::CAPACITY {
247 9322 : if let Err(e) = self.flush_buffer(ctx).await {
248 0 : return (Slice::into_inner(src_buf), Err(e));
249 9322 : }
250 12925014 : }
251 : // Finally, write the tail of src_buf:
252 : // If it wholly fits into the buffer without
253 : // completely filling it, then put it there.
254 : // If not, write it out directly.
255 12934336 : let src_buf = if !src_buf.is_empty() {
256 9282 : assert_eq!(self.buf.len(), 0);
257 9282 : if src_buf.len() < Self::CAPACITY {
258 9052 : let copied = self.write_into_buffer(&src_buf);
259 9052 : // We just verified above that src_buf fits into our internal buffer.
260 9052 : assert_eq!(copied, src_buf.len());
261 9052 : Slice::into_inner(src_buf)
262 : } else {
263 230 : let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
264 230 : if let Err(e) = res {
265 0 : return (src_buf, Err(e));
266 230 : }
267 230 : src_buf
268 : }
269 : } else {
270 12925054 : Slice::into_inner(src_buf)
271 : };
272 12934336 : (src_buf, Ok(()))
273 14018820 : }
274 :
275 : /// Write a blob of data. Returns the offset that it was written to,
276 : /// which can be used to retrieve the data later.
277 10348 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
278 10348 : &mut self,
279 10348 : srcbuf: B,
280 10348 : ctx: &RequestContext,
281 10348 : ) -> (B::Buf, Result<u64, Error>) {
282 10348 : let (buf, res) = self
283 10348 : .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
284 4712 : .await;
285 10348 : (buf, res.map(|(off, _compression_info)| off))
286 10348 : }
287 :
288 : /// Write a blob of data. Returns the offset that it was written to,
289 : /// which can be used to retrieve the data later.
290 7009410 : pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
291 7009410 : &mut self,
292 7009410 : srcbuf: B,
293 7009410 : ctx: &RequestContext,
294 7009410 : algorithm: ImageCompressionAlgorithm,
295 7009410 : ) -> (B::Buf, Result<(u64, CompressionInfo), Error>) {
296 7009410 : let offset = self.offset;
297 7009410 : let mut compression_info = CompressionInfo {
298 7009410 : written_compressed: false,
299 7009410 : compressed_size: None,
300 7009410 : };
301 7009410 :
302 7009410 : let len = srcbuf.bytes_init();
303 7009410 :
304 7009410 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
305 7009410 : io_buf.clear();
306 7009410 : let mut compressed_buf = None;
307 7009410 : let ((io_buf, hdr_res), srcbuf) = async {
308 7009410 : if len < 128 {
309 : // Short blob. Write a 1-byte length header
310 6993046 : io_buf.put_u8(len as u8);
311 6993046 : (
312 6993046 : self.write_all(io_buf, ctx).await,
313 6993046 : srcbuf.slice_full().into_inner(),
314 : )
315 : } else {
316 : // Write a 4-byte length header
317 16364 : if len > MAX_SUPPORTED_LEN {
318 0 : return (
319 0 : (
320 0 : io_buf,
321 0 : Err(Error::new(
322 0 : ErrorKind::Other,
323 0 : format!("blob too large ({len} bytes)"),
324 0 : )),
325 0 : ),
326 0 : srcbuf.slice_full().into_inner(),
327 0 : );
328 16364 : }
329 16364 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
330 2050 : ImageCompressionAlgorithm::Zstd { level } => {
331 2050 : let mut encoder = if let Some(level) = level {
332 2050 : async_compression::tokio::write::ZstdEncoder::with_quality(
333 2050 : Vec::new(),
334 2050 : Level::Precise(level.into()),
335 2050 : )
336 : } else {
337 0 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
338 : };
339 2050 : let slice = srcbuf.slice_full();
340 2050 : encoder.write_all(&slice[..]).await.unwrap();
341 2050 : encoder.shutdown().await.unwrap();
342 2050 : let compressed = encoder.into_inner();
343 2050 : compression_info.compressed_size = Some(compressed.len());
344 2050 : if compressed.len() < len {
345 6 : compression_info.written_compressed = true;
346 6 : let compressed_len = compressed.len();
347 6 : compressed_buf = Some(compressed);
348 6 : (BYTE_ZSTD, compressed_len, slice.into_inner())
349 : } else {
350 2044 : (BYTE_UNCOMPRESSED, len, slice.into_inner())
351 : }
352 : }
353 : ImageCompressionAlgorithm::Disabled => {
354 14314 : (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
355 : }
356 : };
357 16364 : let mut len_buf = (len_written as u32).to_be_bytes();
358 16364 : assert_eq!(len_buf[0] & 0xf0, 0);
359 16364 : len_buf[0] |= high_bit_mask;
360 16364 : io_buf.extend_from_slice(&len_buf[..]);
361 16364 : (self.write_all(io_buf, ctx).await, srcbuf)
362 : }
363 7009410 : }
364 276233 : .await;
365 7009410 : self.io_buf = Some(io_buf);
366 7009410 : match hdr_res {
367 7009410 : Ok(_) => (),
368 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
369 : }
370 7009410 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
371 6 : let (_buf, res) = self.write_all(compressed_buf, ctx).await;
372 6 : (Slice::into_inner(srcbuf.slice(..)), res)
373 : } else {
374 7009404 : self.write_all(srcbuf, ctx).await
375 : };
376 7009410 : (srcbuf, res.map(|_| (offset, compression_info)))
377 7009410 : }
378 : }
379 :
380 : impl BlobWriter<true> {
381 : /// Access the underlying `VirtualFile`.
382 : ///
383 : /// This function flushes the internal buffer before giving access
384 : /// to the underlying `VirtualFile`.
385 1378 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
386 1378 : self.flush_buffer(ctx).await?;
387 1378 : Ok(self.inner)
388 1378 : }
389 :
390 : /// Access the underlying `VirtualFile`.
391 : ///
392 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
393 : /// the internal buffer before giving access.
394 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
395 0 : self.inner
396 0 : }
397 : }
398 :
399 : impl BlobWriter<false> {
400 : /// Access the underlying `VirtualFile`.
401 272 : pub fn into_inner(self) -> VirtualFile {
402 272 : self.inner
403 272 : }
404 : }
405 :
406 : #[cfg(test)]
407 : pub(crate) mod tests {
408 : use super::*;
409 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
410 : use camino::Utf8PathBuf;
411 : use camino_tempfile::Utf8TempDir;
412 : use rand::{Rng, SeedableRng};
413 :
414 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
415 15054 : round_trip_test_compressed::<BUFFERED>(blobs, false).await
416 24 : }
417 :
418 40 : pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
419 40 : blobs: &[Vec<u8>],
420 40 : compression: bool,
421 40 : ctx: &RequestContext,
422 40 : ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
423 40 : let temp_dir = camino_tempfile::tempdir()?;
424 40 : let pathbuf = temp_dir.path().join("file");
425 40 :
426 40 : // Write part (in block to drop the file)
427 40 : let mut offsets = Vec::new();
428 : {
429 40 : let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
430 40 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
431 12408 : for blob in blobs.iter() {
432 12408 : let (_, res) = if compression {
433 2100 : let res = wtr
434 2100 : .write_blob_maybe_compressed(
435 2100 : blob.clone(),
436 2100 : ctx,
437 2100 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
438 2100 : )
439 154 : .await;
440 2100 : (res.0, res.1.map(|(off, _)| off))
441 : } else {
442 10308 : wtr.write_blob(blob.clone(), ctx).await
443 : };
444 12408 : let offs = res?;
445 12408 : offsets.push(offs);
446 : }
447 : // Write out one page worth of zeros so that we can
448 : // read again with read_blk
449 40 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await;
450 40 : let offs = res?;
451 40 : println!("Writing final blob at offs={offs}");
452 40 : wtr.flush_buffer(ctx).await?;
453 : }
454 40 : Ok((temp_dir, pathbuf, offsets))
455 40 : }
456 :
457 32 : async fn round_trip_test_compressed<const BUFFERED: bool>(
458 32 : blobs: &[Vec<u8>],
459 32 : compression: bool,
460 32 : ) -> Result<(), Error> {
461 32 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
462 32 : let (_temp_dir, pathbuf, offsets) =
463 4630 : write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
464 :
465 32 : let file = VirtualFile::open(pathbuf, &ctx).await?;
466 32 : let rdr = BlockReaderRef::VirtualFile(&file);
467 32 : let rdr = BlockCursor::new_with_compression(rdr, compression);
468 8288 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
469 10619 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
470 8288 : assert_eq!(
471 8288 : blob, &blob_read,
472 0 : "mismatch for idx={idx} at offset={offset}"
473 : );
474 : }
475 32 : Ok(())
476 32 : }
477 :
478 6158 : pub(crate) fn random_array(len: usize) -> Vec<u8> {
479 6158 : let mut rng = rand::thread_rng();
480 68151904 : (0..len).map(|_| rng.gen()).collect::<_>()
481 6158 : }
482 :
483 : #[tokio::test]
484 2 : async fn test_one() -> Result<(), Error> {
485 2 : let blobs = &[vec![12, 21, 22]];
486 8 : round_trip_test::<false>(blobs).await?;
487 4 : round_trip_test::<true>(blobs).await?;
488 2 : Ok(())
489 2 : }
490 :
491 : #[tokio::test]
492 2 : async fn test_hello_simple() -> Result<(), Error> {
493 2 : let blobs = &[
494 2 : vec![0, 1, 2, 3],
495 2 : b"Hello, World!".to_vec(),
496 2 : Vec::new(),
497 2 : b"foobar".to_vec(),
498 2 : ];
499 16 : round_trip_test::<false>(blobs).await?;
500 7 : round_trip_test::<true>(blobs).await?;
501 15 : round_trip_test_compressed::<false>(blobs, true).await?;
502 7 : round_trip_test_compressed::<true>(blobs, true).await?;
503 2 : Ok(())
504 2 : }
505 :
506 : #[tokio::test]
507 2 : async fn test_really_big_array() -> Result<(), Error> {
508 2 : let blobs = &[
509 2 : b"test".to_vec(),
510 2 : random_array(10 * PAGE_SZ),
511 2 : b"hello".to_vec(),
512 2 : random_array(66 * PAGE_SZ),
513 2 : vec![0xf3; 24 * PAGE_SZ],
514 2 : b"foobar".to_vec(),
515 2 : ];
516 124 : round_trip_test::<false>(blobs).await?;
517 116 : round_trip_test::<true>(blobs).await?;
518 100 : round_trip_test_compressed::<false>(blobs, true).await?;
519 89 : round_trip_test_compressed::<true>(blobs, true).await?;
520 2 : Ok(())
521 2 : }
522 :
523 : #[tokio::test]
524 2 : async fn test_arrays_inc() -> Result<(), Error> {
525 2 : let blobs = (0..PAGE_SZ / 8)
526 2048 : .map(|v| random_array(v * 16))
527 2 : .collect::<Vec<_>>();
528 4162 : round_trip_test::<false>(&blobs).await?;
529 2212 : round_trip_test::<true>(&blobs).await?;
530 2 : Ok(())
531 2 : }
532 :
533 : #[tokio::test]
534 2 : async fn test_arrays_random_size() -> Result<(), Error> {
535 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
536 2 : let blobs = (0..1024)
537 2048 : .map(|_| {
538 2048 : let mut sz: u16 = rng.gen();
539 2048 : // Make 50% of the arrays small
540 2048 : if rng.gen() {
541 1032 : sz &= 63;
542 1032 : }
543 2048 : random_array(sz.into())
544 2048 : })
545 2 : .collect::<Vec<_>>();
546 5106 : round_trip_test::<false>(&blobs).await?;
547 3279 : round_trip_test::<true>(&blobs).await?;
548 2 : Ok(())
549 2 : }
550 :
551 : #[tokio::test]
552 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
553 2 : let blobs = &[
554 2 : random_array(PAGE_SZ - 4),
555 2 : random_array(PAGE_SZ - 4),
556 2 : random_array(PAGE_SZ - 4),
557 2 : ];
558 14 : round_trip_test::<false>(blobs).await?;
559 6 : round_trip_test::<true>(blobs).await?;
560 2 : Ok(())
561 2 : }
562 : }
|