Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use async_compression::Level;
18 : use bytes::{BufMut, BytesMut};
19 : use pageserver_api::models::ImageCompressionAlgorithm;
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
22 : use tracing::warn;
23 :
24 : use crate::context::RequestContext;
25 : use crate::page_cache::PAGE_SZ;
26 : use crate::tenant::block_io::BlockCursor;
27 : use crate::virtual_file::VirtualFile;
28 : use std::cmp::min;
29 : use std::io::{Error, ErrorKind};
30 :
31 : impl<'a> BlockCursor<'a> {
32 : /// Read a blob into a new buffer.
33 2598546 : pub async fn read_blob(
34 2598546 : &self,
35 2598546 : offset: u64,
36 2598546 : ctx: &RequestContext,
37 2598546 : ) -> Result<Vec<u8>, std::io::Error> {
38 2598546 : let mut buf = Vec::new();
39 2598546 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
40 2598546 : Ok(buf)
41 2598546 : }
42 : /// Read blob into the given buffer. Any previous contents in the buffer
43 : /// are overwritten.
44 7106080 : pub async fn read_blob_into_buf(
45 7106080 : &self,
46 7106080 : offset: u64,
47 7106080 : dstbuf: &mut Vec<u8>,
48 7106080 : ctx: &RequestContext,
49 7106080 : ) -> Result<(), std::io::Error> {
50 7106080 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
51 7106080 : let mut off = (offset % PAGE_SZ as u64) as usize;
52 :
53 7106080 : let mut buf = self.read_blk(blknum, ctx).await?;
54 :
55 : // peek at the first byte, to determine if it's a 1- or 4-byte length
56 7106080 : let first_len_byte = buf[off];
57 7106080 : let len: usize = if first_len_byte < 0x80 {
58 : // 1-byte length header
59 7091258 : off += 1;
60 7091258 : first_len_byte as usize
61 : } else {
62 : // 4-byte length header
63 14822 : let mut len_buf = [0u8; 4];
64 14822 : let thislen = PAGE_SZ - off;
65 14822 : if thislen < 4 {
66 : // it is split across two pages
67 1 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
68 1 : blknum += 1;
69 1 : buf = self.read_blk(blknum, ctx).await?;
70 1 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
71 1 : off = 4 - thislen;
72 14821 : } else {
73 14821 : len_buf.copy_from_slice(&buf[off..off + 4]);
74 14821 : off += 4;
75 14821 : }
76 14822 : let bit_mask = if self.read_compressed {
77 22 : !LEN_COMPRESSION_BIT_MASK
78 : } else {
79 14800 : 0x7f
80 : };
81 14822 : len_buf[0] &= bit_mask;
82 14822 : u32::from_be_bytes(len_buf) as usize
83 : };
84 7106080 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
85 7106080 :
86 7106080 : let mut tmp_buf = Vec::new();
87 : let buf_to_write;
88 7106080 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
89 7106076 : if compression_bits > BYTE_UNCOMPRESSED {
90 0 : warn!("reading key above future limit ({len} bytes)");
91 7106076 : }
92 7106076 : buf_to_write = dstbuf;
93 7106076 : None
94 4 : } else if compression_bits == BYTE_ZSTD {
95 4 : buf_to_write = &mut tmp_buf;
96 4 : Some(dstbuf)
97 : } else {
98 0 : let error = std::io::Error::new(
99 0 : std::io::ErrorKind::InvalidData,
100 0 : format!("invalid compression byte {compression_bits:x}"),
101 0 : );
102 0 : return Err(error);
103 : };
104 :
105 7106080 : buf_to_write.clear();
106 7106080 : buf_to_write.reserve(len);
107 7106080 :
108 7106080 : // Read the payload
109 7106080 : let mut remain = len;
110 14295618 : while remain > 0 {
111 7189538 : let mut page_remain = PAGE_SZ - off;
112 7189538 : if page_remain == 0 {
113 : // continue on next page
114 84139 : blknum += 1;
115 84139 : buf = self.read_blk(blknum, ctx).await?;
116 84139 : off = 0;
117 84139 : page_remain = PAGE_SZ;
118 7105399 : }
119 7189538 : let this_blk_len = min(remain, page_remain);
120 7189538 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
121 7189538 : remain -= this_blk_len;
122 7189538 : off += this_blk_len;
123 : }
124 :
125 7106080 : if let Some(dstbuf) = compression {
126 4 : if compression_bits == BYTE_ZSTD {
127 4 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
128 4 : decoder.write_all(buf_to_write).await?;
129 4 : decoder.flush().await?;
130 : } else {
131 0 : unreachable!("already checked above")
132 : }
133 7106076 : }
134 :
135 7106080 : Ok(())
136 7106080 : }
137 : }
138 :
139 : /// Reserved bits for length and compression
140 : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
141 :
142 : /// The maximum size of blobs we support. The highest few bits
143 : /// are reserved for compression and other further uses.
144 : const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
145 :
146 : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
147 : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
148 :
149 : /// A wrapper of `VirtualFile` that allows users to write blobs.
150 : ///
151 : /// If a `BlobWriter` is dropped, the internal buffer will be
152 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
153 : /// manually before dropping.
154 : pub struct BlobWriter<const BUFFERED: bool> {
155 : inner: VirtualFile,
156 : offset: u64,
157 : /// A buffer to save on write calls, only used if BUFFERED=true
158 : buf: Vec<u8>,
159 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
160 : io_buf: Option<BytesMut>,
161 : }
162 :
163 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
164 1636 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
165 1636 : Self {
166 1636 : inner,
167 1636 : offset: start_offset,
168 1636 : buf: Vec::with_capacity(Self::CAPACITY),
169 1636 : io_buf: Some(BytesMut::new()),
170 1636 : }
171 1636 : }
172 :
173 2025806 : pub fn size(&self) -> u64 {
174 2025806 : self.offset
175 2025806 : }
176 :
177 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
178 :
179 : /// Writes the given buffer directly to the underlying `VirtualFile`.
180 : /// You need to make sure that the internal buffer is empty, otherwise
181 : /// data will be written in wrong order.
182 : #[inline(always)]
183 1084082 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
184 1084082 : &mut self,
185 1084082 : src_buf: B,
186 1084082 : ctx: &RequestContext,
187 1084082 : ) -> (B::Buf, Result<(), Error>) {
188 1084082 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
189 1084082 : let nbytes = match res {
190 1084082 : Ok(nbytes) => nbytes,
191 0 : Err(e) => return (src_buf, Err(e)),
192 : };
193 1084082 : self.offset += nbytes as u64;
194 1084082 : (src_buf, Ok(()))
195 1084082 : }
196 :
197 : #[inline(always)]
198 : /// Flushes the internal buffer to the underlying `VirtualFile`.
199 10708 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
200 10708 : let buf = std::mem::take(&mut self.buf);
201 10708 : let (mut buf, res) = self.inner.write_all(buf, ctx).await;
202 10708 : res?;
203 10708 : buf.clear();
204 10708 : self.buf = buf;
205 10708 : Ok(())
206 10708 : }
207 :
208 : #[inline(always)]
209 : /// Writes as much of `src_buf` into the internal buffer as it fits
210 12943236 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
211 12943236 : let remaining = Self::CAPACITY - self.buf.len();
212 12943236 : let to_copy = src_buf.len().min(remaining);
213 12943236 : self.buf.extend_from_slice(&src_buf[..to_copy]);
214 12943236 : self.offset += to_copy as u64;
215 12943236 : to_copy
216 12943236 : }
217 :
218 : /// Internal, possibly buffered, write function
219 14018060 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
220 14018060 : &mut self,
221 14018060 : src_buf: B,
222 14018060 : ctx: &RequestContext,
223 14018060 : ) -> (B::Buf, Result<(), Error>) {
224 14018060 : if !BUFFERED {
225 1083852 : assert!(self.buf.is_empty());
226 1083852 : return self.write_all_unbuffered(src_buf, ctx).await;
227 12934208 : }
228 12934208 : let remaining = Self::CAPACITY - self.buf.len();
229 12934208 : let src_buf_len = src_buf.bytes_init();
230 12934208 : if src_buf_len == 0 {
231 24 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
232 12934184 : }
233 12934184 : let mut src_buf = src_buf.slice(0..src_buf_len);
234 12934184 : // First try to copy as much as we can into the buffer
235 12934184 : if remaining > 0 {
236 12934184 : let copied = self.write_into_buffer(&src_buf);
237 12934184 : src_buf = src_buf.slice(copied..);
238 12934184 : }
239 : // Then, if the buffer is full, flush it out
240 12934184 : if self.buf.len() == Self::CAPACITY {
241 9322 : if let Err(e) = self.flush_buffer(ctx).await {
242 0 : return (Slice::into_inner(src_buf), Err(e));
243 9322 : }
244 12924862 : }
245 : // Finally, write the tail of src_buf:
246 : // If it wholly fits into the buffer without
247 : // completely filling it, then put it there.
248 : // If not, write it out directly.
249 12934184 : let src_buf = if !src_buf.is_empty() {
250 9282 : assert_eq!(self.buf.len(), 0);
251 9282 : if src_buf.len() < Self::CAPACITY {
252 9052 : let copied = self.write_into_buffer(&src_buf);
253 9052 : // We just verified above that src_buf fits into our internal buffer.
254 9052 : assert_eq!(copied, src_buf.len());
255 9052 : Slice::into_inner(src_buf)
256 : } else {
257 230 : let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
258 230 : if let Err(e) = res {
259 0 : return (src_buf, Err(e));
260 230 : }
261 230 : src_buf
262 : }
263 : } else {
264 12924902 : Slice::into_inner(src_buf)
265 : };
266 12934184 : (src_buf, Ok(()))
267 14018060 : }
268 :
269 : /// Write a blob of data. Returns the offset that it was written to,
270 : /// which can be used to retrieve the data later.
271 10348 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
272 10348 : &mut self,
273 10348 : srcbuf: B,
274 10348 : ctx: &RequestContext,
275 10348 : ) -> (B::Buf, Result<u64, Error>) {
276 10348 : self.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
277 4712 : .await
278 10348 : }
279 :
280 : /// Write a blob of data. Returns the offset that it was written to,
281 : /// which can be used to retrieve the data later.
282 7009030 : pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
283 7009030 : &mut self,
284 7009030 : srcbuf: B,
285 7009030 : ctx: &RequestContext,
286 7009030 : algorithm: ImageCompressionAlgorithm,
287 7009030 : ) -> (B::Buf, Result<u64, Error>) {
288 7009030 : let offset = self.offset;
289 7009030 :
290 7009030 : let len = srcbuf.bytes_init();
291 7009030 :
292 7009030 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
293 7009030 : io_buf.clear();
294 7009030 : let mut compressed_buf = None;
295 7009030 : let ((io_buf, hdr_res), srcbuf) = async {
296 7009030 : if len < 128 {
297 7009030 : // Short blob. Write a 1-byte length header
298 7009030 : io_buf.put_u8(len as u8);
299 6992666 : (
300 6992666 : self.write_all(io_buf, ctx).await,
301 7009030 : srcbuf.slice_full().into_inner(),
302 7009030 : )
303 7009030 : } else {
304 7009030 : // Write a 4-byte length header
305 7009030 : if len > MAX_SUPPORTED_LEN {
306 7009030 : return (
307 0 : (
308 0 : io_buf,
309 0 : Err(Error::new(
310 0 : ErrorKind::Other,
311 0 : format!("blob too large ({len} bytes)"),
312 0 : )),
313 0 : ),
314 0 : srcbuf.slice_full().into_inner(),
315 0 : );
316 7009030 : }
317 7009030 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
318 7009030 : ImageCompressionAlgorithm::Zstd { level } => {
319 7009030 : let mut encoder = if let Some(level) = level {
320 7009030 : async_compression::tokio::write::ZstdEncoder::with_quality(
321 2050 : Vec::new(),
322 2050 : Level::Precise(level.into()),
323 2050 : )
324 7009030 : } else {
325 7009030 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
326 7009030 : };
327 7009030 : let slice = srcbuf.slice_full();
328 2050 : encoder.write_all(&slice[..]).await.unwrap();
329 2050 : encoder.shutdown().await.unwrap();
330 2050 : let compressed = encoder.into_inner();
331 2050 : if compressed.len() < len {
332 7009030 : let compressed_len = compressed.len();
333 6 : compressed_buf = Some(compressed);
334 6 : (BYTE_ZSTD, compressed_len, slice.into_inner())
335 7009030 : } else {
336 7009030 : (BYTE_UNCOMPRESSED, len, slice.into_inner())
337 7009030 : }
338 7009030 : }
339 7009030 : ImageCompressionAlgorithm::Disabled => {
340 7009030 : (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
341 7009030 : }
342 7009030 : };
343 7009030 : let mut len_buf = (len_written as u32).to_be_bytes();
344 16364 : assert_eq!(len_buf[0] & 0xf0, 0);
345 7009030 : len_buf[0] |= high_bit_mask;
346 16364 : io_buf.extend_from_slice(&len_buf[..]);
347 16364 : (self.write_all(io_buf, ctx).await, srcbuf)
348 7009030 : }
349 7009030 : }
350 276099 : .await;
351 7009030 : self.io_buf = Some(io_buf);
352 7009030 : match hdr_res {
353 7009030 : Ok(_) => (),
354 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
355 : }
356 7009030 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
357 6 : let (_buf, res) = self.write_all(compressed_buf, ctx).await;
358 6 : (Slice::into_inner(srcbuf.slice(..)), res)
359 : } else {
360 7009024 : self.write_all(srcbuf, ctx).await
361 : };
362 7009030 : (srcbuf, res.map(|_| offset))
363 7009030 : }
364 : }
365 :
366 : impl BlobWriter<true> {
367 : /// Access the underlying `VirtualFile`.
368 : ///
369 : /// This function flushes the internal buffer before giving access
370 : /// to the underlying `VirtualFile`.
371 1346 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
372 1346 : self.flush_buffer(ctx).await?;
373 1346 : Ok(self.inner)
374 1346 : }
375 :
376 : /// Access the underlying `VirtualFile`.
377 : ///
378 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
379 : /// the internal buffer before giving access.
380 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
381 0 : self.inner
382 0 : }
383 : }
384 :
385 : impl BlobWriter<false> {
386 : /// Access the underlying `VirtualFile`.
387 250 : pub fn into_inner(self) -> VirtualFile {
388 250 : self.inner
389 250 : }
390 : }
391 :
392 : #[cfg(test)]
393 : pub(crate) mod tests {
394 : use super::*;
395 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
396 : use camino::Utf8PathBuf;
397 : use camino_tempfile::Utf8TempDir;
398 : use rand::{Rng, SeedableRng};
399 :
400 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
401 15054 : round_trip_test_compressed::<BUFFERED>(blobs, false).await
402 24 : }
403 :
404 40 : pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
405 40 : blobs: &[Vec<u8>],
406 40 : compression: bool,
407 40 : ctx: &RequestContext,
408 40 : ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
409 40 : let temp_dir = camino_tempfile::tempdir()?;
410 40 : let pathbuf = temp_dir.path().join("file");
411 40 :
412 40 : // Write part (in block to drop the file)
413 40 : let mut offsets = Vec::new();
414 : {
415 40 : let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
416 40 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
417 12408 : for blob in blobs.iter() {
418 12408 : let (_, res) = if compression {
419 2100 : wtr.write_blob_maybe_compressed(
420 2100 : blob.clone(),
421 2100 : ctx,
422 2100 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
423 2100 : )
424 154 : .await
425 : } else {
426 10308 : wtr.write_blob(blob.clone(), ctx).await
427 : };
428 12408 : let offs = res?;
429 12408 : offsets.push(offs);
430 : }
431 : // Write out one page worth of zeros so that we can
432 : // read again with read_blk
433 40 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await;
434 40 : let offs = res?;
435 40 : println!("Writing final blob at offs={offs}");
436 40 : wtr.flush_buffer(ctx).await?;
437 : }
438 40 : Ok((temp_dir, pathbuf, offsets))
439 40 : }
440 :
441 32 : async fn round_trip_test_compressed<const BUFFERED: bool>(
442 32 : blobs: &[Vec<u8>],
443 32 : compression: bool,
444 32 : ) -> Result<(), Error> {
445 32 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
446 32 : let (_temp_dir, pathbuf, offsets) =
447 4630 : write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
448 :
449 32 : let file = VirtualFile::open(pathbuf, &ctx).await?;
450 32 : let rdr = BlockReaderRef::VirtualFile(&file);
451 32 : let rdr = BlockCursor::new_with_compression(rdr, compression);
452 8288 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
453 10619 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
454 8288 : assert_eq!(
455 8288 : blob, &blob_read,
456 0 : "mismatch for idx={idx} at offset={offset}"
457 : );
458 : }
459 32 : Ok(())
460 32 : }
461 :
462 6158 : pub(crate) fn random_array(len: usize) -> Vec<u8> {
463 6158 : let mut rng = rand::thread_rng();
464 68151904 : (0..len).map(|_| rng.gen()).collect::<_>()
465 6158 : }
466 :
467 : #[tokio::test]
468 2 : async fn test_one() -> Result<(), Error> {
469 2 : let blobs = &[vec![12, 21, 22]];
470 8 : round_trip_test::<false>(blobs).await?;
471 4 : round_trip_test::<true>(blobs).await?;
472 2 : Ok(())
473 2 : }
474 :
475 : #[tokio::test]
476 2 : async fn test_hello_simple() -> Result<(), Error> {
477 2 : let blobs = &[
478 2 : vec![0, 1, 2, 3],
479 2 : b"Hello, World!".to_vec(),
480 2 : Vec::new(),
481 2 : b"foobar".to_vec(),
482 2 : ];
483 16 : round_trip_test::<false>(blobs).await?;
484 7 : round_trip_test::<true>(blobs).await?;
485 15 : round_trip_test_compressed::<false>(blobs, true).await?;
486 7 : round_trip_test_compressed::<true>(blobs, true).await?;
487 2 : Ok(())
488 2 : }
489 :
490 : #[tokio::test]
491 2 : async fn test_really_big_array() -> Result<(), Error> {
492 2 : let blobs = &[
493 2 : b"test".to_vec(),
494 2 : random_array(10 * PAGE_SZ),
495 2 : b"hello".to_vec(),
496 2 : random_array(66 * PAGE_SZ),
497 2 : vec![0xf3; 24 * PAGE_SZ],
498 2 : b"foobar".to_vec(),
499 2 : ];
500 124 : round_trip_test::<false>(blobs).await?;
501 116 : round_trip_test::<true>(blobs).await?;
502 100 : round_trip_test_compressed::<false>(blobs, true).await?;
503 89 : round_trip_test_compressed::<true>(blobs, true).await?;
504 2 : Ok(())
505 2 : }
506 :
507 : #[tokio::test]
508 2 : async fn test_arrays_inc() -> Result<(), Error> {
509 2 : let blobs = (0..PAGE_SZ / 8)
510 2048 : .map(|v| random_array(v * 16))
511 2 : .collect::<Vec<_>>();
512 4162 : round_trip_test::<false>(&blobs).await?;
513 2212 : round_trip_test::<true>(&blobs).await?;
514 2 : Ok(())
515 2 : }
516 :
517 : #[tokio::test]
518 2 : async fn test_arrays_random_size() -> Result<(), Error> {
519 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
520 2 : let blobs = (0..1024)
521 2048 : .map(|_| {
522 2048 : let mut sz: u16 = rng.gen();
523 2048 : // Make 50% of the arrays small
524 2048 : if rng.gen() {
525 1032 : sz &= 63;
526 1032 : }
527 2048 : random_array(sz.into())
528 2048 : })
529 2 : .collect::<Vec<_>>();
530 5106 : round_trip_test::<false>(&blobs).await?;
531 3279 : round_trip_test::<true>(&blobs).await?;
532 2 : Ok(())
533 2 : }
534 :
535 : #[tokio::test]
536 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
537 2 : let blobs = &[
538 2 : random_array(PAGE_SZ - 4),
539 2 : random_array(PAGE_SZ - 4),
540 2 : random_array(PAGE_SZ - 4),
541 2 : ];
542 14 : round_trip_test::<false>(blobs).await?;
543 6 : round_trip_test::<true>(blobs).await?;
544 2 : Ok(())
545 2 : }
546 : }
|