Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use async_compression::Level;
18 : use bytes::{BufMut, BytesMut};
19 : use pageserver_api::models::ImageCompressionAlgorithm;
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
22 : use tracing::warn;
23 :
24 : use crate::context::RequestContext;
25 : use crate::page_cache::PAGE_SZ;
26 : use crate::tenant::block_io::BlockCursor;
27 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
28 : use crate::virtual_file::VirtualFile;
29 : use std::cmp::min;
30 : use std::io::{Error, ErrorKind};
31 :
32 : #[derive(Copy, Clone, Debug)]
33 : pub struct CompressionInfo {
34 : pub written_compressed: bool,
35 : pub compressed_size: Option<usize>,
36 : }
37 :
38 : impl<'a> BlockCursor<'a> {
39 : /// Read a blob into a new buffer.
40 24888 : pub async fn read_blob(
41 24888 : &self,
42 24888 : offset: u64,
43 24888 : ctx: &RequestContext,
44 24888 : ) -> Result<Vec<u8>, std::io::Error> {
45 24888 : let mut buf = Vec::new();
46 31863 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
47 24888 : Ok(buf)
48 24888 : }
49 : /// Read blob into the given buffer. Any previous contents in the buffer
50 : /// are overwritten.
51 25080 : pub async fn read_blob_into_buf(
52 25080 : &self,
53 25080 : offset: u64,
54 25080 : dstbuf: &mut Vec<u8>,
55 25080 : ctx: &RequestContext,
56 25080 : ) -> Result<(), std::io::Error> {
57 25080 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
58 25080 : let mut off = (offset % PAGE_SZ as u64) as usize;
59 :
60 25080 : let mut buf = self.read_blk(blknum, ctx).await?;
61 :
62 : // peek at the first byte, to determine if it's a 1- or 4-byte length
63 25080 : let first_len_byte = buf[off];
64 25080 : let len: usize = if first_len_byte < 0x80 {
65 : // 1-byte length header
66 6672 : off += 1;
67 6672 : first_len_byte as usize
68 : } else {
69 : // 4-byte length header
70 18408 : let mut len_buf = [0u8; 4];
71 18408 : let thislen = PAGE_SZ - off;
72 18408 : if thislen < 4 {
73 : // it is split across two pages
74 0 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
75 0 : blknum += 1;
76 0 : buf = self.read_blk(blknum, ctx).await?;
77 0 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
78 0 : off = 4 - thislen;
79 18408 : } else {
80 18408 : len_buf.copy_from_slice(&buf[off..off + 4]);
81 18408 : off += 4;
82 18408 : }
83 18408 : let bit_mask = if self.read_compressed {
84 60 : !LEN_COMPRESSION_BIT_MASK
85 : } else {
86 18348 : 0x7f
87 : };
88 18408 : len_buf[0] &= bit_mask;
89 18408 : u32::from_be_bytes(len_buf) as usize
90 : };
91 25080 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
92 25080 :
93 25080 : let mut tmp_buf = Vec::new();
94 : let buf_to_write;
95 25080 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
96 25068 : if compression_bits > BYTE_UNCOMPRESSED {
97 0 : warn!("reading key above future limit ({len} bytes)");
98 25068 : }
99 25068 : buf_to_write = dstbuf;
100 25068 : None
101 12 : } else if compression_bits == BYTE_ZSTD {
102 12 : buf_to_write = &mut tmp_buf;
103 12 : Some(dstbuf)
104 : } else {
105 0 : let error = std::io::Error::new(
106 0 : std::io::ErrorKind::InvalidData,
107 0 : format!("invalid compression byte {compression_bits:x}"),
108 0 : );
109 0 : return Err(error);
110 : };
111 :
112 25080 : buf_to_write.clear();
113 25080 : buf_to_write.reserve(len);
114 25080 :
115 25080 : // Read the payload
116 25080 : let mut remain = len;
117 88308 : while remain > 0 {
118 63228 : let mut page_remain = PAGE_SZ - off;
119 63228 : if page_remain == 0 {
120 : // continue on next page
121 38268 : blknum += 1;
122 38268 : buf = self.read_blk(blknum, ctx).await?;
123 38268 : off = 0;
124 38268 : page_remain = PAGE_SZ;
125 24960 : }
126 63228 : let this_blk_len = min(remain, page_remain);
127 63228 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
128 63228 : remain -= this_blk_len;
129 63228 : off += this_blk_len;
130 : }
131 :
132 25080 : if let Some(dstbuf) = compression {
133 12 : if compression_bits == BYTE_ZSTD {
134 12 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
135 12 : decoder.write_all(buf_to_write).await?;
136 12 : decoder.flush().await?;
137 : } else {
138 0 : unreachable!("already checked above")
139 : }
140 25068 : }
141 :
142 25080 : Ok(())
143 25080 : }
144 : }
145 :
146 : /// Reserved bits for length and compression
147 : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
148 :
149 : /// The maximum size of blobs we support. The highest few bits
150 : /// are reserved for compression and other further uses.
151 : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
152 :
153 : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
154 : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
155 :
156 : /// A wrapper of `VirtualFile` that allows users to write blobs.
157 : ///
158 : /// If a `BlobWriter` is dropped, the internal buffer will be
159 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
160 : /// manually before dropping.
161 : pub struct BlobWriter<const BUFFERED: bool> {
162 : inner: VirtualFile,
163 : offset: u64,
164 : /// A buffer to save on write calls, only used if BUFFERED=true
165 : buf: Vec<u8>,
166 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
167 : io_buf: Option<BytesMut>,
168 : }
169 :
170 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
171 5874 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
172 5874 : Self {
173 5874 : inner,
174 5874 : offset: start_offset,
175 5874 : buf: Vec::with_capacity(Self::CAPACITY),
176 5874 : io_buf: Some(BytesMut::new()),
177 5874 : }
178 5874 : }
179 :
180 6148374 : pub fn size(&self) -> u64 {
181 6148374 : self.offset
182 6148374 : }
183 :
184 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
185 :
186 : /// Writes the given buffer directly to the underlying `VirtualFile`.
187 : /// You need to make sure that the internal buffer is empty, otherwise
188 : /// data will be written in wrong order.
189 : #[inline(always)]
190 3302838 : async fn write_all_unbuffered<Buf: IoBuf + Send>(
191 3302838 : &mut self,
192 3302838 : src_buf: FullSlice<Buf>,
193 3302838 : ctx: &RequestContext,
194 3302838 : ) -> (FullSlice<Buf>, Result<(), Error>) {
195 3302838 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
196 3302838 : let nbytes = match res {
197 3302838 : Ok(nbytes) => nbytes,
198 0 : Err(e) => return (src_buf, Err(e)),
199 : };
200 3302838 : self.offset += nbytes as u64;
201 3302838 : (src_buf, Ok(()))
202 3302838 : }
203 :
204 : #[inline(always)]
205 : /// Flushes the internal buffer to the underlying `VirtualFile`.
206 36774 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
207 36774 : let buf = std::mem::take(&mut self.buf);
208 36774 : let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
209 36774 : res?;
210 36774 : let mut buf = slice.into_raw_slice().into_inner();
211 36774 : buf.clear();
212 36774 : self.buf = buf;
213 36774 : Ok(())
214 36774 : }
215 :
216 : #[inline(always)]
217 : /// Writes as much of `src_buf` into the internal buffer as it fits
218 38908596 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
219 38908596 : let remaining = Self::CAPACITY - self.buf.len();
220 38908596 : let to_copy = src_buf.len().min(remaining);
221 38908596 : self.buf.extend_from_slice(&src_buf[..to_copy]);
222 38908596 : self.offset += to_copy as u64;
223 38908596 : to_copy
224 38908596 : }
225 :
226 : /// Internal, possibly buffered, write function
227 42179184 : async fn write_all<Buf: IoBuf + Send>(
228 42179184 : &mut self,
229 42179184 : src_buf: FullSlice<Buf>,
230 42179184 : ctx: &RequestContext,
231 42179184 : ) -> (FullSlice<Buf>, Result<(), Error>) {
232 42179184 : let src_buf = src_buf.into_raw_slice();
233 42179184 : let src_buf_bounds = src_buf.bounds();
234 42179184 : let restore = move |src_buf_slice: Slice<_>| {
235 38876346 : FullSlice::must_new(Slice::from_buf_bounds(
236 38876346 : src_buf_slice.into_inner(),
237 38876346 : src_buf_bounds,
238 38876346 : ))
239 38876346 : };
240 :
241 42179184 : if !BUFFERED {
242 3302148 : assert!(self.buf.is_empty());
243 3302148 : return self
244 3302148 : .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
245 1677191 : .await;
246 38877036 : }
247 38877036 : let remaining = Self::CAPACITY - self.buf.len();
248 38877036 : let src_buf_len = src_buf.bytes_init();
249 38877036 : if src_buf_len == 0 {
250 72 : return (restore(src_buf), Ok(()));
251 38876964 : }
252 38876964 : let mut src_buf = src_buf.slice(0..src_buf_len);
253 38876964 : // First try to copy as much as we can into the buffer
254 38876964 : if remaining > 0 {
255 38876964 : let copied = self.write_into_buffer(&src_buf);
256 38876964 : src_buf = src_buf.slice(copied..);
257 38876964 : }
258 : // Then, if the buffer is full, flush it out
259 38876964 : if self.buf.len() == Self::CAPACITY {
260 32442 : if let Err(e) = self.flush_buffer(ctx).await {
261 0 : return (restore(src_buf), Err(e));
262 32442 : }
263 38844522 : }
264 : // Finally, write the tail of src_buf:
265 : // If it wholly fits into the buffer without
266 : // completely filling it, then put it there.
267 : // If not, write it out directly.
268 38876964 : let src_buf = if !src_buf.is_empty() {
269 32322 : assert_eq!(self.buf.len(), 0);
270 32322 : if src_buf.len() < Self::CAPACITY {
271 31632 : let copied = self.write_into_buffer(&src_buf);
272 31632 : // We just verified above that src_buf fits into our internal buffer.
273 31632 : assert_eq!(copied, src_buf.len());
274 31632 : restore(src_buf)
275 : } else {
276 690 : let (src_buf, res) = self
277 690 : .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
278 345 : .await;
279 690 : if let Err(e) = res {
280 0 : return (src_buf, Err(e));
281 690 : }
282 690 : src_buf
283 : }
284 : } else {
285 38844642 : restore(src_buf)
286 : };
287 38876964 : (src_buf, Ok(()))
288 42179184 : }
289 :
290 : /// Write a blob of data. Returns the offset that it was written to,
291 : /// which can be used to retrieve the data later.
292 31044 : pub async fn write_blob<Buf: IoBuf + Send>(
293 31044 : &mut self,
294 31044 : srcbuf: FullSlice<Buf>,
295 31044 : ctx: &RequestContext,
296 31044 : ) -> (FullSlice<Buf>, Result<u64, Error>) {
297 31044 : let (buf, res) = self
298 31044 : .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
299 14136 : .await;
300 31044 : (buf, res.map(|(off, _compression_info)| off))
301 31044 : }
302 :
303 : /// Write a blob of data. Returns the offset that it was written to,
304 : /// which can be used to retrieve the data later.
305 21089592 : pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
306 21089592 : &mut self,
307 21089592 : srcbuf: FullSlice<Buf>,
308 21089592 : ctx: &RequestContext,
309 21089592 : algorithm: ImageCompressionAlgorithm,
310 21089592 : ) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
311 21089592 : let offset = self.offset;
312 21089592 : let mut compression_info = CompressionInfo {
313 21089592 : written_compressed: false,
314 21089592 : compressed_size: None,
315 21089592 : };
316 21089592 :
317 21089592 : let len = srcbuf.len();
318 21089592 :
319 21089592 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
320 21089592 : io_buf.clear();
321 21089592 : let mut compressed_buf = None;
322 21089592 : let ((io_buf_slice, hdr_res), srcbuf) = async {
323 21089592 : if len < 128 {
324 : // Short blob. Write a 1-byte length header
325 20980488 : io_buf.put_u8(len as u8);
326 20980488 : (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
327 : } else {
328 : // Write a 4-byte length header
329 109104 : if len > MAX_SUPPORTED_BLOB_LEN {
330 0 : return (
331 0 : (
332 0 : io_buf.slice_len(),
333 0 : Err(Error::new(
334 0 : ErrorKind::Other,
335 0 : format!("blob too large ({len} bytes)"),
336 0 : )),
337 0 : ),
338 0 : srcbuf,
339 0 : );
340 109104 : }
341 109104 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
342 30156 : ImageCompressionAlgorithm::Zstd { level } => {
343 30156 : let mut encoder = if let Some(level) = level {
344 30156 : async_compression::tokio::write::ZstdEncoder::with_quality(
345 30156 : Vec::new(),
346 30156 : Level::Precise(level.into()),
347 30156 : )
348 : } else {
349 0 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
350 : };
351 30156 : encoder.write_all(&srcbuf[..]).await.unwrap();
352 30156 : encoder.shutdown().await.unwrap();
353 30156 : let compressed = encoder.into_inner();
354 30156 : compression_info.compressed_size = Some(compressed.len());
355 30156 : if compressed.len() < len {
356 18 : compression_info.written_compressed = true;
357 18 : let compressed_len = compressed.len();
358 18 : compressed_buf = Some(compressed);
359 18 : (BYTE_ZSTD, compressed_len, srcbuf)
360 : } else {
361 30138 : (BYTE_UNCOMPRESSED, len, srcbuf)
362 : }
363 : }
364 78948 : ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
365 : };
366 109104 : let mut len_buf = (len_written as u32).to_be_bytes();
367 109104 : assert_eq!(len_buf[0] & 0xf0, 0);
368 109104 : len_buf[0] |= high_bit_mask;
369 109104 : io_buf.extend_from_slice(&len_buf[..]);
370 109104 : (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
371 : }
372 21089592 : }
373 841160 : .await;
374 21089592 : self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
375 21089592 : match hdr_res {
376 21089592 : Ok(_) => (),
377 0 : Err(e) => return (srcbuf, Err(e)),
378 : }
379 21089592 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
380 18 : let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
381 18 : (srcbuf, res)
382 : } else {
383 21089574 : self.write_all(srcbuf, ctx).await
384 : };
385 21089592 : (srcbuf, res.map(|_| (offset, compression_info)))
386 21089592 : }
387 : }
388 :
389 : impl BlobWriter<true> {
390 : /// Access the underlying `VirtualFile`.
391 : ///
392 : /// This function flushes the internal buffer before giving access
393 : /// to the underlying `VirtualFile`.
394 4212 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
395 4212 : self.flush_buffer(ctx).await?;
396 4212 : Ok(self.inner)
397 4212 : }
398 :
399 : /// Access the underlying `VirtualFile`.
400 : ///
401 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
402 : /// the internal buffer before giving access.
403 36 : pub fn into_inner_no_flush(self) -> VirtualFile {
404 36 : self.inner
405 36 : }
406 : }
407 :
408 : impl BlobWriter<false> {
409 : /// Access the underlying `VirtualFile`.
410 1506 : pub fn into_inner(self) -> VirtualFile {
411 1506 : self.inner
412 1506 : }
413 : }
414 :
415 : #[cfg(test)]
416 : pub(crate) mod tests {
417 : use super::*;
418 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
419 : use camino::Utf8PathBuf;
420 : use camino_tempfile::Utf8TempDir;
421 : use rand::{Rng, SeedableRng};
422 :
423 72 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
424 45162 : round_trip_test_compressed::<BUFFERED>(blobs, false).await
425 72 : }
426 :
427 120 : pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
428 120 : blobs: &[Vec<u8>],
429 120 : compression: bool,
430 120 : ctx: &RequestContext,
431 120 : ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
432 120 : let temp_dir = camino_tempfile::tempdir()?;
433 120 : let pathbuf = temp_dir.path().join("file");
434 120 :
435 120 : // Write part (in block to drop the file)
436 120 : let mut offsets = Vec::new();
437 : {
438 120 : let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
439 120 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
440 37224 : for blob in blobs.iter() {
441 37224 : let (_, res) = if compression {
442 6300 : let res = wtr
443 6300 : .write_blob_maybe_compressed(
444 6300 : blob.clone().slice_len(),
445 6300 : ctx,
446 6300 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
447 6300 : )
448 462 : .await;
449 6300 : (res.0, res.1.map(|(off, _)| off))
450 : } else {
451 30924 : wtr.write_blob(blob.clone().slice_len(), ctx).await
452 : };
453 37224 : let offs = res?;
454 37224 : offsets.push(offs);
455 : }
456 : // Write out one page worth of zeros so that we can
457 : // read again with read_blk
458 120 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
459 120 : let offs = res?;
460 120 : println!("Writing final blob at offs={offs}");
461 120 : wtr.flush_buffer(ctx).await?;
462 : }
463 120 : Ok((temp_dir, pathbuf, offsets))
464 120 : }
465 :
466 96 : async fn round_trip_test_compressed<const BUFFERED: bool>(
467 96 : blobs: &[Vec<u8>],
468 96 : compression: bool,
469 96 : ) -> Result<(), Error> {
470 96 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
471 96 : let (_temp_dir, pathbuf, offsets) =
472 13890 : write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
473 :
474 96 : let file = VirtualFile::open(pathbuf, &ctx).await?;
475 96 : let rdr = BlockReaderRef::VirtualFile(&file);
476 96 : let rdr = BlockCursor::new_with_compression(rdr, compression);
477 24864 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
478 31857 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
479 24864 : assert_eq!(
480 24864 : blob, &blob_read,
481 0 : "mismatch for idx={idx} at offset={offset}"
482 : );
483 : }
484 96 : Ok(())
485 96 : }
486 :
487 18474 : pub(crate) fn random_array(len: usize) -> Vec<u8> {
488 18474 : let mut rng = rand::thread_rng();
489 204455712 : (0..len).map(|_| rng.gen()).collect::<_>()
490 18474 : }
491 :
492 : #[tokio::test]
493 6 : async fn test_one() -> Result<(), Error> {
494 6 : let blobs = &[vec![12, 21, 22]];
495 24 : round_trip_test::<false>(blobs).await?;
496 12 : round_trip_test::<true>(blobs).await?;
497 6 : Ok(())
498 6 : }
499 :
500 : #[tokio::test]
501 6 : async fn test_hello_simple() -> Result<(), Error> {
502 6 : let blobs = &[
503 6 : vec![0, 1, 2, 3],
504 6 : b"Hello, World!".to_vec(),
505 6 : Vec::new(),
506 6 : b"foobar".to_vec(),
507 6 : ];
508 48 : round_trip_test::<false>(blobs).await?;
509 21 : round_trip_test::<true>(blobs).await?;
510 45 : round_trip_test_compressed::<false>(blobs, true).await?;
511 21 : round_trip_test_compressed::<true>(blobs, true).await?;
512 6 : Ok(())
513 6 : }
514 :
515 : #[tokio::test]
516 6 : async fn test_really_big_array() -> Result<(), Error> {
517 6 : let blobs = &[
518 6 : b"test".to_vec(),
519 6 : random_array(10 * PAGE_SZ),
520 6 : b"hello".to_vec(),
521 6 : random_array(66 * PAGE_SZ),
522 6 : vec![0xf3; 24 * PAGE_SZ],
523 6 : b"foobar".to_vec(),
524 6 : ];
525 372 : round_trip_test::<false>(blobs).await?;
526 348 : round_trip_test::<true>(blobs).await?;
527 300 : round_trip_test_compressed::<false>(blobs, true).await?;
528 267 : round_trip_test_compressed::<true>(blobs, true).await?;
529 6 : Ok(())
530 6 : }
531 :
532 : #[tokio::test]
533 6 : async fn test_arrays_inc() -> Result<(), Error> {
534 6 : let blobs = (0..PAGE_SZ / 8)
535 6144 : .map(|v| random_array(v * 16))
536 6 : .collect::<Vec<_>>();
537 12486 : round_trip_test::<false>(&blobs).await?;
538 6636 : round_trip_test::<true>(&blobs).await?;
539 6 : Ok(())
540 6 : }
541 :
542 : #[tokio::test]
543 6 : async fn test_arrays_random_size() -> Result<(), Error> {
544 6 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
545 6 : let blobs = (0..1024)
546 6144 : .map(|_| {
547 6144 : let mut sz: u16 = rng.gen();
548 6144 : // Make 50% of the arrays small
549 6144 : if rng.gen() {
550 3096 : sz &= 63;
551 3096 : }
552 6144 : random_array(sz.into())
553 6144 : })
554 6 : .collect::<Vec<_>>();
555 15318 : round_trip_test::<false>(&blobs).await?;
556 9837 : round_trip_test::<true>(&blobs).await?;
557 6 : Ok(())
558 6 : }
559 :
560 : #[tokio::test]
561 6 : async fn test_arrays_page_boundary() -> Result<(), Error> {
562 6 : let blobs = &[
563 6 : random_array(PAGE_SZ - 4),
564 6 : random_array(PAGE_SZ - 4),
565 6 : random_array(PAGE_SZ - 4),
566 6 : ];
567 42 : round_trip_test::<false>(blobs).await?;
568 18 : round_trip_test::<true>(blobs).await?;
569 6 : Ok(())
570 6 : }
571 : }
|