Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use std::cmp::min;
18 : use std::io::{Error, ErrorKind};
19 :
20 : use async_compression::Level;
21 : use bytes::{BufMut, BytesMut};
22 : use pageserver_api::models::ImageCompressionAlgorithm;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
25 : use tracing::warn;
26 :
27 : use crate::context::RequestContext;
28 : use crate::page_cache::PAGE_SZ;
29 : use crate::tenant::block_io::BlockCursor;
30 : use crate::virtual_file::VirtualFile;
31 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
32 :
33 : #[derive(Copy, Clone, Debug)]
34 : pub struct CompressionInfo {
35 : pub written_compressed: bool,
36 : pub compressed_size: Option<usize>,
37 : }
38 :
39 : impl BlockCursor<'_> {
40 : /// Read a blob into a new buffer.
41 16592 : pub async fn read_blob(
42 16592 : &self,
43 16592 : offset: u64,
44 16592 : ctx: &RequestContext,
45 16592 : ) -> Result<Vec<u8>, std::io::Error> {
46 16592 : let mut buf = Vec::new();
47 16592 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
48 16592 : Ok(buf)
49 16592 : }
50 : /// Read blob into the given buffer. Any previous contents in the buffer
51 : /// are overwritten.
52 16720 : pub async fn read_blob_into_buf(
53 16720 : &self,
54 16720 : offset: u64,
55 16720 : dstbuf: &mut Vec<u8>,
56 16720 : ctx: &RequestContext,
57 16720 : ) -> Result<(), std::io::Error> {
58 16720 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
59 16720 : let mut off = (offset % PAGE_SZ as u64) as usize;
60 :
61 16720 : let mut buf = self.read_blk(blknum, ctx).await?;
62 :
63 : // peek at the first byte, to determine if it's a 1- or 4-byte length
64 16720 : let first_len_byte = buf[off];
65 16720 : let len: usize = if first_len_byte < 0x80 {
66 : // 1-byte length header
67 4448 : off += 1;
68 4448 : first_len_byte as usize
69 : } else {
70 : // 4-byte length header
71 12272 : let mut len_buf = [0u8; 4];
72 12272 : let thislen = PAGE_SZ - off;
73 12272 : if thislen < 4 {
74 : // it is split across two pages
75 0 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
76 0 : blknum += 1;
77 0 : buf = self.read_blk(blknum, ctx).await?;
78 0 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
79 0 : off = 4 - thislen;
80 12272 : } else {
81 12272 : len_buf.copy_from_slice(&buf[off..off + 4]);
82 12272 : off += 4;
83 12272 : }
84 12272 : let bit_mask = if self.read_compressed {
85 40 : !LEN_COMPRESSION_BIT_MASK
86 : } else {
87 12232 : 0x7f
88 : };
89 12272 : len_buf[0] &= bit_mask;
90 12272 : u32::from_be_bytes(len_buf) as usize
91 : };
92 16720 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
93 16720 :
94 16720 : let mut tmp_buf = Vec::new();
95 : let buf_to_write;
96 16720 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
97 16712 : if compression_bits > BYTE_UNCOMPRESSED {
98 0 : warn!("reading key above future limit ({len} bytes)");
99 16712 : }
100 16712 : buf_to_write = dstbuf;
101 16712 : None
102 8 : } else if compression_bits == BYTE_ZSTD {
103 8 : buf_to_write = &mut tmp_buf;
104 8 : Some(dstbuf)
105 : } else {
106 0 : let error = std::io::Error::new(
107 0 : std::io::ErrorKind::InvalidData,
108 0 : format!("invalid compression byte {compression_bits:x}"),
109 0 : );
110 0 : return Err(error);
111 : };
112 :
113 16720 : buf_to_write.clear();
114 16720 : buf_to_write.reserve(len);
115 16720 :
116 16720 : // Read the payload
117 16720 : let mut remain = len;
118 58872 : while remain > 0 {
119 42152 : let mut page_remain = PAGE_SZ - off;
120 42152 : if page_remain == 0 {
121 : // continue on next page
122 25512 : blknum += 1;
123 25512 : buf = self.read_blk(blknum, ctx).await?;
124 25512 : off = 0;
125 25512 : page_remain = PAGE_SZ;
126 16640 : }
127 42152 : let this_blk_len = min(remain, page_remain);
128 42152 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
129 42152 : remain -= this_blk_len;
130 42152 : off += this_blk_len;
131 : }
132 :
133 16720 : if let Some(dstbuf) = compression {
134 8 : if compression_bits == BYTE_ZSTD {
135 8 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
136 8 : decoder.write_all(buf_to_write).await?;
137 8 : decoder.flush().await?;
138 : } else {
139 0 : unreachable!("already checked above")
140 : }
141 16712 : }
142 :
143 16720 : Ok(())
144 16720 : }
145 : }
146 :
147 : /// Reserved bits for length and compression
148 : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
149 :
150 : /// The maximum size of blobs we support. The highest few bits
151 : /// are reserved for compression and other further uses.
152 : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
153 :
154 : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
155 : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
156 :
157 : /// A wrapper of `VirtualFile` that allows users to write blobs.
158 : ///
159 : /// If a `BlobWriter` is dropped, the internal buffer will be
160 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
161 : /// manually before dropping.
162 : pub struct BlobWriter<const BUFFERED: bool> {
163 : inner: VirtualFile,
164 : offset: u64,
165 : /// A buffer to save on write calls, only used if BUFFERED=true
166 : buf: Vec<u8>,
167 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
168 : io_buf: Option<BytesMut>,
169 : }
170 :
171 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
172 4220 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
173 4220 : Self {
174 4220 : inner,
175 4220 : offset: start_offset,
176 4220 : buf: Vec::with_capacity(Self::CAPACITY),
177 4220 : io_buf: Some(BytesMut::new()),
178 4220 : }
179 4220 : }
180 :
181 4099652 : pub fn size(&self) -> u64 {
182 4099652 : self.offset
183 4099652 : }
184 :
185 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
186 :
187 : /// Writes the given buffer directly to the underlying `VirtualFile`.
188 : /// You need to make sure that the internal buffer is empty, otherwise
189 : /// data will be written in wrong order.
190 : #[inline(always)]
191 2203844 : async fn write_all_unbuffered<Buf: IoBuf + Send>(
192 2203844 : &mut self,
193 2203844 : src_buf: FullSlice<Buf>,
194 2203844 : ctx: &RequestContext,
195 2203844 : ) -> (FullSlice<Buf>, Result<(), Error>) {
196 2203844 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
197 2203844 : let nbytes = match res {
198 2203844 : Ok(nbytes) => nbytes,
199 0 : Err(e) => return (src_buf, Err(e)),
200 : };
201 2203844 : self.offset += nbytes as u64;
202 2203844 : (src_buf, Ok(()))
203 2203844 : }
204 :
205 : #[inline(always)]
206 : /// Flushes the internal buffer to the underlying `VirtualFile`.
207 24592 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
208 24592 : let buf = std::mem::take(&mut self.buf);
209 24592 : let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
210 24592 : res?;
211 24592 : let mut buf = slice.into_raw_slice().into_inner();
212 24592 : buf.clear();
213 24592 : self.buf = buf;
214 24592 : Ok(())
215 24592 : }
216 :
217 : #[inline(always)]
218 : /// Writes as much of `src_buf` into the internal buffer as it fits
219 25939688 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
220 25939688 : let remaining = Self::CAPACITY - self.buf.len();
221 25939688 : let to_copy = src_buf.len().min(remaining);
222 25939688 : self.buf.extend_from_slice(&src_buf[..to_copy]);
223 25939688 : self.offset += to_copy as u64;
224 25939688 : to_copy
225 25939688 : }
226 :
227 : /// Internal, possibly buffered, write function
228 28122032 : async fn write_all<Buf: IoBuf + Send>(
229 28122032 : &mut self,
230 28122032 : src_buf: FullSlice<Buf>,
231 28122032 : ctx: &RequestContext,
232 28122032 : ) -> (FullSlice<Buf>, Result<(), Error>) {
233 28122032 : let src_buf = src_buf.into_raw_slice();
234 28122032 : let src_buf_bounds = src_buf.bounds();
235 28122032 : let restore = move |src_buf_slice: Slice<_>| {
236 25918188 : FullSlice::must_new(Slice::from_buf_bounds(
237 25918188 : src_buf_slice.into_inner(),
238 25918188 : src_buf_bounds,
239 25918188 : ))
240 25918188 : };
241 :
242 28122032 : if !BUFFERED {
243 2203384 : assert!(self.buf.is_empty());
244 2203384 : return self
245 2203384 : .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
246 2203384 : .await;
247 25918648 : }
248 25918648 : let remaining = Self::CAPACITY - self.buf.len();
249 25918648 : let src_buf_len = src_buf.bytes_init();
250 25918648 : if src_buf_len == 0 {
251 48 : return (restore(src_buf), Ok(()));
252 25918600 : }
253 25918600 : let mut src_buf = src_buf.slice(0..src_buf_len);
254 25918600 : // First try to copy as much as we can into the buffer
255 25918600 : if remaining > 0 {
256 25918600 : let copied = self.write_into_buffer(&src_buf);
257 25918600 : src_buf = src_buf.slice(copied..);
258 25918600 : }
259 : // Then, if the buffer is full, flush it out
260 25918600 : if self.buf.len() == Self::CAPACITY {
261 21628 : if let Err(e) = self.flush_buffer(ctx).await {
262 0 : return (restore(src_buf), Err(e));
263 21628 : }
264 25896972 : }
265 : // Finally, write the tail of src_buf:
266 : // If it wholly fits into the buffer without
267 : // completely filling it, then put it there.
268 : // If not, write it out directly.
269 25918600 : let src_buf = if !src_buf.is_empty() {
270 21548 : assert_eq!(self.buf.len(), 0);
271 21548 : if src_buf.len() < Self::CAPACITY {
272 21088 : let copied = self.write_into_buffer(&src_buf);
273 21088 : // We just verified above that src_buf fits into our internal buffer.
274 21088 : assert_eq!(copied, src_buf.len());
275 21088 : restore(src_buf)
276 : } else {
277 460 : let (src_buf, res) = self
278 460 : .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
279 460 : .await;
280 460 : if let Err(e) = res {
281 0 : return (src_buf, Err(e));
282 460 : }
283 460 : src_buf
284 : }
285 : } else {
286 25897052 : restore(src_buf)
287 : };
288 25918600 : (src_buf, Ok(()))
289 28122032 : }
290 :
291 : /// Write a blob of data. Returns the offset that it was written to,
292 : /// which can be used to retrieve the data later.
293 20696 : pub async fn write_blob<Buf: IoBuf + Send>(
294 20696 : &mut self,
295 20696 : srcbuf: FullSlice<Buf>,
296 20696 : ctx: &RequestContext,
297 20696 : ) -> (FullSlice<Buf>, Result<u64, Error>) {
298 20696 : let (buf, res) = self
299 20696 : .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
300 20696 : .await;
301 20696 : (buf, res.map(|(off, _compression_info)| off))
302 20696 : }
303 :
304 : /// Write a blob of data. Returns the offset that it was written to,
305 : /// which can be used to retrieve the data later.
306 14061016 : pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
307 14061016 : &mut self,
308 14061016 : srcbuf: FullSlice<Buf>,
309 14061016 : ctx: &RequestContext,
310 14061016 : algorithm: ImageCompressionAlgorithm,
311 14061016 : ) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
312 14061016 : let offset = self.offset;
313 14061016 : let mut compression_info = CompressionInfo {
314 14061016 : written_compressed: false,
315 14061016 : compressed_size: None,
316 14061016 : };
317 14061016 :
318 14061016 : let len = srcbuf.len();
319 14061016 :
320 14061016 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
321 14061016 : io_buf.clear();
322 14061016 : let mut compressed_buf = None;
323 14061016 : let ((io_buf_slice, hdr_res), srcbuf) = async {
324 14061016 : if len < 128 {
325 : // Short blob. Write a 1-byte length header
326 13988280 : io_buf.put_u8(len as u8);
327 13988280 : (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
328 : } else {
329 : // Write a 4-byte length header
330 72736 : if len > MAX_SUPPORTED_BLOB_LEN {
331 0 : return (
332 0 : (
333 0 : io_buf.slice_len(),
334 0 : Err(Error::new(
335 0 : ErrorKind::Other,
336 0 : format!("blob too large ({len} bytes)"),
337 0 : )),
338 0 : ),
339 0 : srcbuf,
340 0 : );
341 72736 : }
342 72736 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
343 20104 : ImageCompressionAlgorithm::Zstd { level } => {
344 20104 : let mut encoder = if let Some(level) = level {
345 20104 : async_compression::tokio::write::ZstdEncoder::with_quality(
346 20104 : Vec::new(),
347 20104 : Level::Precise(level.into()),
348 20104 : )
349 : } else {
350 0 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
351 : };
352 20104 : encoder.write_all(&srcbuf[..]).await.unwrap();
353 20104 : encoder.shutdown().await.unwrap();
354 20104 : let compressed = encoder.into_inner();
355 20104 : compression_info.compressed_size = Some(compressed.len());
356 20104 : if compressed.len() < len {
357 12 : compression_info.written_compressed = true;
358 12 : let compressed_len = compressed.len();
359 12 : compressed_buf = Some(compressed);
360 12 : (BYTE_ZSTD, compressed_len, srcbuf)
361 : } else {
362 20092 : (BYTE_UNCOMPRESSED, len, srcbuf)
363 : }
364 : }
365 52632 : ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
366 : };
367 72736 : let mut len_buf = (len_written as u32).to_be_bytes();
368 72736 : assert_eq!(len_buf[0] & 0xf0, 0);
369 72736 : len_buf[0] |= high_bit_mask;
370 72736 : io_buf.extend_from_slice(&len_buf[..]);
371 72736 : (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
372 : }
373 14061016 : }
374 14061016 : .await;
375 14061016 : self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
376 14061016 : match hdr_res {
377 14061016 : Ok(_) => (),
378 0 : Err(e) => return (srcbuf, Err(e)),
379 : }
380 14061016 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
381 12 : let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
382 12 : (srcbuf, res)
383 : } else {
384 14061004 : self.write_all(srcbuf, ctx).await
385 : };
386 14061016 : (srcbuf, res.map(|_| (offset, compression_info)))
387 14061016 : }
388 : }
389 :
390 : impl BlobWriter<true> {
391 : /// Access the underlying `VirtualFile`.
392 : ///
393 : /// This function flushes the internal buffer before giving access
394 : /// to the underlying `VirtualFile`.
395 2884 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
396 2884 : self.flush_buffer(ctx).await?;
397 2884 : Ok(self.inner)
398 2884 : }
399 :
400 : /// Access the underlying `VirtualFile`.
401 : ///
402 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
403 : /// the internal buffer before giving access.
404 52 : pub fn into_inner_no_flush(self) -> VirtualFile {
405 52 : self.inner
406 52 : }
407 : }
408 :
409 : impl BlobWriter<false> {
410 : /// Access the underlying `VirtualFile`.
411 1204 : pub fn into_inner(self) -> VirtualFile {
412 1204 : self.inner
413 1204 : }
414 : }
415 :
416 : #[cfg(test)]
417 : pub(crate) mod tests {
418 : use camino::Utf8PathBuf;
419 : use camino_tempfile::Utf8TempDir;
420 : use rand::{Rng, SeedableRng};
421 :
422 : use super::*;
423 : use crate::context::DownloadBehavior;
424 : use crate::task_mgr::TaskKind;
425 : use crate::tenant::block_io::BlockReaderRef;
426 :
427 48 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
428 48 : round_trip_test_compressed::<BUFFERED>(blobs, false).await
429 48 : }
430 :
431 80 : pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
432 80 : blobs: &[Vec<u8>],
433 80 : compression: bool,
434 80 : ctx: &RequestContext,
435 80 : ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
436 80 : let temp_dir = camino_tempfile::tempdir()?;
437 80 : let pathbuf = temp_dir.path().join("file");
438 80 :
439 80 : // Write part (in block to drop the file)
440 80 : let mut offsets = Vec::new();
441 : {
442 80 : let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
443 80 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
444 24816 : for blob in blobs.iter() {
445 24816 : let (_, res) = if compression {
446 4200 : let res = wtr
447 4200 : .write_blob_maybe_compressed(
448 4200 : blob.clone().slice_len(),
449 4200 : ctx,
450 4200 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
451 4200 : )
452 4200 : .await;
453 4200 : (res.0, res.1.map(|(off, _)| off))
454 : } else {
455 20616 : wtr.write_blob(blob.clone().slice_len(), ctx).await
456 : };
457 24816 : let offs = res?;
458 24816 : offsets.push(offs);
459 : }
460 : // Write out one page worth of zeros so that we can
461 : // read again with read_blk
462 80 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
463 80 : let offs = res?;
464 80 : println!("Writing final blob at offs={offs}");
465 80 : wtr.flush_buffer(ctx).await?;
466 : }
467 80 : Ok((temp_dir, pathbuf, offsets))
468 80 : }
469 :
470 64 : async fn round_trip_test_compressed<const BUFFERED: bool>(
471 64 : blobs: &[Vec<u8>],
472 64 : compression: bool,
473 64 : ) -> Result<(), Error> {
474 64 : let ctx =
475 64 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
476 64 : let (_temp_dir, pathbuf, offsets) =
477 64 : write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
478 :
479 64 : let file = VirtualFile::open(pathbuf, &ctx).await?;
480 64 : let rdr = BlockReaderRef::VirtualFile(&file);
481 64 : let rdr = BlockCursor::new_with_compression(rdr, compression);
482 16576 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
483 16576 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
484 16576 : assert_eq!(
485 16576 : blob, &blob_read,
486 0 : "mismatch for idx={idx} at offset={offset}"
487 : );
488 : }
489 64 : Ok(())
490 64 : }
491 :
492 12316 : pub(crate) fn random_array(len: usize) -> Vec<u8> {
493 12316 : let mut rng = rand::thread_rng();
494 136303808 : (0..len).map(|_| rng.r#gen()).collect::<_>()
495 12316 : }
496 :
497 : #[tokio::test]
498 4 : async fn test_one() -> Result<(), Error> {
499 4 : let blobs = &[vec![12, 21, 22]];
500 4 : round_trip_test::<false>(blobs).await?;
501 4 : round_trip_test::<true>(blobs).await?;
502 4 : Ok(())
503 4 : }
504 :
505 : #[tokio::test]
506 4 : async fn test_hello_simple() -> Result<(), Error> {
507 4 : let blobs = &[
508 4 : vec![0, 1, 2, 3],
509 4 : b"Hello, World!".to_vec(),
510 4 : Vec::new(),
511 4 : b"foobar".to_vec(),
512 4 : ];
513 4 : round_trip_test::<false>(blobs).await?;
514 4 : round_trip_test::<true>(blobs).await?;
515 4 : round_trip_test_compressed::<false>(blobs, true).await?;
516 4 : round_trip_test_compressed::<true>(blobs, true).await?;
517 4 : Ok(())
518 4 : }
519 :
520 : #[tokio::test]
521 4 : async fn test_really_big_array() -> Result<(), Error> {
522 4 : let blobs = &[
523 4 : b"test".to_vec(),
524 4 : random_array(10 * PAGE_SZ),
525 4 : b"hello".to_vec(),
526 4 : random_array(66 * PAGE_SZ),
527 4 : vec![0xf3; 24 * PAGE_SZ],
528 4 : b"foobar".to_vec(),
529 4 : ];
530 4 : round_trip_test::<false>(blobs).await?;
531 4 : round_trip_test::<true>(blobs).await?;
532 4 : round_trip_test_compressed::<false>(blobs, true).await?;
533 4 : round_trip_test_compressed::<true>(blobs, true).await?;
534 4 : Ok(())
535 4 : }
536 :
537 : #[tokio::test]
538 4 : async fn test_arrays_inc() -> Result<(), Error> {
539 4 : let blobs = (0..PAGE_SZ / 8)
540 4096 : .map(|v| random_array(v * 16))
541 4 : .collect::<Vec<_>>();
542 4 : round_trip_test::<false>(&blobs).await?;
543 4 : round_trip_test::<true>(&blobs).await?;
544 4 : Ok(())
545 4 : }
546 :
547 : #[tokio::test]
548 4 : async fn test_arrays_random_size() -> Result<(), Error> {
549 4 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
550 4 : let blobs = (0..1024)
551 4096 : .map(|_| {
552 4096 : let mut sz: u16 = rng.r#gen();
553 4096 : // Make 50% of the arrays small
554 4096 : if rng.r#gen() {
555 2064 : sz &= 63;
556 2064 : }
557 4096 : random_array(sz.into())
558 4096 : })
559 4 : .collect::<Vec<_>>();
560 4 : round_trip_test::<false>(&blobs).await?;
561 4 : round_trip_test::<true>(&blobs).await?;
562 4 : Ok(())
563 4 : }
564 :
565 : #[tokio::test]
566 4 : async fn test_arrays_page_boundary() -> Result<(), Error> {
567 4 : let blobs = &[
568 4 : random_array(PAGE_SZ - 4),
569 4 : random_array(PAGE_SZ - 4),
570 4 : random_array(PAGE_SZ - 4),
571 4 : ];
572 4 : round_trip_test::<false>(blobs).await?;
573 4 : round_trip_test::<true>(blobs).await?;
574 4 : Ok(())
575 4 : }
576 : }
|