Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use std::cmp::min;
18 : use std::io::Error;
19 :
20 : use async_compression::Level;
21 : use bytes::{BufMut, BytesMut};
22 : use pageserver_api::models::ImageCompressionAlgorithm;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::warn;
27 :
28 : use crate::context::RequestContext;
29 : use crate::page_cache::PAGE_SZ;
30 : use crate::tenant::block_io::BlockCursor;
31 : use crate::virtual_file::VirtualFile;
32 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
33 :
34 : #[derive(Copy, Clone, Debug)]
35 : pub struct CompressionInfo {
36 : pub written_compressed: bool,
37 : pub compressed_size: Option<usize>,
38 : }
39 :
40 : /// A blob header, with header+data length and compression info.
41 : ///
42 : /// TODO: use this more widely, and add an encode() method too.
43 : /// TODO: document the header format.
44 : #[derive(Clone, Copy, Default)]
45 : pub struct Header {
46 : pub header_len: usize,
47 : pub data_len: usize,
48 : pub compression_bits: u8,
49 : }
50 :
51 : impl Header {
52 : /// Decodes a header from a byte slice.
53 23347181 : pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
54 23347181 : let Some(&first_header_byte) = bytes.first() else {
55 0 : return Err(std::io::Error::new(
56 0 : std::io::ErrorKind::InvalidData,
57 0 : "zero-length blob header",
58 0 : ));
59 : };
60 :
61 : // If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
62 23347181 : if first_header_byte < 0x80 {
63 23096597 : return Ok(Self {
64 23096597 : header_len: 1, // by definition
65 23096597 : data_len: first_header_byte as usize,
66 23096597 : compression_bits: BYTE_UNCOMPRESSED,
67 23096597 : });
68 250584 : }
69 :
70 : // Otherwise, this is a 4-byte header containing compression information and length.
71 : const HEADER_LEN: usize = 4;
72 250584 : let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| {
73 0 : std::io::Error::new(
74 0 : std::io::ErrorKind::InvalidData,
75 0 : format!("blob header too short: {bytes:?}"),
76 0 : )
77 250584 : })?;
78 :
79 : // TODO: verify the compression bits and convert to an enum.
80 250584 : let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
81 250584 : header_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
82 250584 : let data_len = u32::from_be_bytes(header_buf) as usize;
83 250584 :
84 250584 : Ok(Self {
85 250584 : header_len: HEADER_LEN,
86 250584 : data_len,
87 250584 : compression_bits,
88 250584 : })
89 23347181 : }
90 :
91 : /// Returns the total header+data length.
92 122976 : pub fn total_len(&self) -> usize {
93 122976 : self.header_len + self.data_len
94 122976 : }
95 : }
96 :
97 : impl BlockCursor<'_> {
98 : /// Read a blob into a new buffer.
99 49776 : pub async fn read_blob(
100 49776 : &self,
101 49776 : offset: u64,
102 49776 : ctx: &RequestContext,
103 49776 : ) -> Result<Vec<u8>, std::io::Error> {
104 49776 : let mut buf = Vec::new();
105 49776 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
106 49776 : Ok(buf)
107 49776 : }
108 : /// Read blob into the given buffer. Any previous contents in the buffer
109 : /// are overwritten.
110 50160 : pub async fn read_blob_into_buf(
111 50160 : &self,
112 50160 : offset: u64,
113 50160 : dstbuf: &mut Vec<u8>,
114 50160 : ctx: &RequestContext,
115 50160 : ) -> Result<(), std::io::Error> {
116 50160 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
117 50160 : let mut off = (offset % PAGE_SZ as u64) as usize;
118 :
119 50160 : let mut buf = self.read_blk(blknum, ctx).await?;
120 :
121 : // peek at the first byte, to determine if it's a 1- or 4-byte length
122 50160 : let first_len_byte = buf[off];
123 50160 : let len: usize = if first_len_byte < 0x80 {
124 : // 1-byte length header
125 13344 : off += 1;
126 13344 : first_len_byte as usize
127 : } else {
128 : // 4-byte length header
129 36816 : let mut len_buf = [0u8; 4];
130 36816 : let thislen = PAGE_SZ - off;
131 36816 : if thislen < 4 {
132 : // it is split across two pages
133 0 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
134 0 : blknum += 1;
135 0 : buf = self.read_blk(blknum, ctx).await?;
136 0 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
137 0 : off = 4 - thislen;
138 36816 : } else {
139 36816 : len_buf.copy_from_slice(&buf[off..off + 4]);
140 36816 : off += 4;
141 36816 : }
142 36816 : let bit_mask = if self.read_compressed {
143 120 : !LEN_COMPRESSION_BIT_MASK
144 : } else {
145 36696 : 0x7f
146 : };
147 36816 : len_buf[0] &= bit_mask;
148 36816 : u32::from_be_bytes(len_buf) as usize
149 : };
150 50160 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
151 50160 :
152 50160 : let mut tmp_buf = Vec::new();
153 : let buf_to_write;
154 50160 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
155 50136 : if compression_bits > BYTE_UNCOMPRESSED {
156 0 : warn!("reading key above future limit ({len} bytes)");
157 50136 : }
158 50136 : buf_to_write = dstbuf;
159 50136 : None
160 24 : } else if compression_bits == BYTE_ZSTD {
161 24 : buf_to_write = &mut tmp_buf;
162 24 : Some(dstbuf)
163 : } else {
164 0 : let error = std::io::Error::new(
165 0 : std::io::ErrorKind::InvalidData,
166 0 : format!("invalid compression byte {compression_bits:x}"),
167 0 : );
168 0 : return Err(error);
169 : };
170 :
171 50160 : buf_to_write.clear();
172 50160 : buf_to_write.reserve(len);
173 50160 :
174 50160 : // Read the payload
175 50160 : let mut remain = len;
176 176616 : while remain > 0 {
177 126456 : let mut page_remain = PAGE_SZ - off;
178 126456 : if page_remain == 0 {
179 : // continue on next page
180 76536 : blknum += 1;
181 76536 : buf = self.read_blk(blknum, ctx).await?;
182 76536 : off = 0;
183 76536 : page_remain = PAGE_SZ;
184 49920 : }
185 126456 : let this_blk_len = min(remain, page_remain);
186 126456 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
187 126456 : remain -= this_blk_len;
188 126456 : off += this_blk_len;
189 : }
190 :
191 50160 : if let Some(dstbuf) = compression {
192 24 : if compression_bits == BYTE_ZSTD {
193 24 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
194 24 : decoder.write_all(buf_to_write).await?;
195 24 : decoder.flush().await?;
196 : } else {
197 0 : unreachable!("already checked above")
198 : }
199 50136 : }
200 :
201 50160 : Ok(())
202 50160 : }
203 : }
204 :
205 : /// Reserved bits for length and compression
206 : pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
207 :
208 : /// The maximum size of blobs we support. The highest few bits
209 : /// are reserved for compression and other further uses.
210 : pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff;
211 :
212 : pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
213 : pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
214 :
215 : /// A wrapper of `VirtualFile` that allows users to write blobs.
216 : ///
217 : /// If a `BlobWriter` is dropped, the internal buffer will be
218 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
219 : /// manually before dropping.
220 : pub struct BlobWriter<const BUFFERED: bool> {
221 : inner: VirtualFile,
222 : offset: u64,
223 : /// A buffer to save on write calls, only used if BUFFERED=true
224 : buf: Vec<u8>,
225 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
226 : io_buf: Option<BytesMut>,
227 : }
228 :
229 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
230 12840 : pub fn new(
231 12840 : inner: VirtualFile,
232 12840 : start_offset: u64,
233 12840 : _gate: &utils::sync::gate::Gate,
234 12840 : _cancel: CancellationToken,
235 12840 : _ctx: &RequestContext,
236 12840 : ) -> Self {
237 12840 : Self {
238 12840 : inner,
239 12840 : offset: start_offset,
240 12840 : buf: Vec::with_capacity(Self::CAPACITY),
241 12840 : io_buf: Some(BytesMut::new()),
242 12840 : }
243 12840 : }
244 :
245 12299256 : pub fn size(&self) -> u64 {
246 12299256 : self.offset
247 12299256 : }
248 :
249 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
250 :
251 : /// Writes the given buffer directly to the underlying `VirtualFile`.
252 : /// You need to make sure that the internal buffer is empty, otherwise
253 : /// data will be written in wrong order.
254 : #[inline(always)]
255 619260 : async fn write_all_unbuffered<Buf: IoBuf + Send>(
256 619260 : &mut self,
257 619260 : src_buf: FullSlice<Buf>,
258 619260 : ctx: &RequestContext,
259 619260 : ) -> (FullSlice<Buf>, Result<(), Error>) {
260 619260 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
261 619260 : let nbytes = match res {
262 619260 : Ok(nbytes) => nbytes,
263 0 : Err(e) => return (src_buf, Err(e)),
264 : };
265 619260 : self.offset += nbytes as u64;
266 619260 : (src_buf, Ok(()))
267 619260 : }
268 :
269 : #[inline(always)]
270 : /// Flushes the internal buffer to the underlying `VirtualFile`.
271 73932 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
272 73932 : let buf = std::mem::take(&mut self.buf);
273 73932 : let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
274 73932 : res?;
275 73932 : let mut buf = slice.into_raw_slice().into_inner();
276 73932 : buf.clear();
277 73932 : self.buf = buf;
278 73932 : Ok(())
279 73932 : }
280 :
281 : #[inline(always)]
282 : /// Writes as much of `src_buf` into the internal buffer as it fits
283 78066408 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
284 78066408 : let remaining = Self::CAPACITY - self.buf.len();
285 78066408 : let to_copy = src_buf.len().min(remaining);
286 78066408 : self.buf.extend_from_slice(&src_buf[..to_copy]);
287 78066408 : self.offset += to_copy as u64;
288 78066408 : to_copy
289 78066408 : }
290 :
291 : /// Internal, possibly buffered, write function
292 78621072 : async fn write_all<Buf: IoBuf + Send>(
293 78621072 : &mut self,
294 78621072 : src_buf: FullSlice<Buf>,
295 78621072 : ctx: &RequestContext,
296 78621072 : ) -> (FullSlice<Buf>, Result<(), Error>) {
297 78621072 : let src_buf = src_buf.into_raw_slice();
298 78621072 : let src_buf_bounds = src_buf.bounds();
299 78621072 : let restore = move |src_buf_slice: Slice<_>| {
300 78001812 : FullSlice::must_new(Slice::from_buf_bounds(
301 78001812 : src_buf_slice.into_inner(),
302 78001812 : src_buf_bounds,
303 78001812 : ))
304 78001812 : };
305 :
306 78621072 : if !BUFFERED {
307 617880 : assert!(self.buf.is_empty());
308 617880 : return self
309 617880 : .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
310 617880 : .await;
311 78003192 : }
312 78003192 : let remaining = Self::CAPACITY - self.buf.len();
313 78003192 : let src_buf_len = src_buf.bytes_init();
314 78003192 : if src_buf_len == 0 {
315 144 : return (restore(src_buf), Ok(()));
316 78003048 : }
317 78003048 : let mut src_buf = src_buf.slice(0..src_buf_len);
318 78003048 : // First try to copy as much as we can into the buffer
319 78003048 : if remaining > 0 {
320 78003048 : let copied = self.write_into_buffer(&src_buf);
321 78003048 : src_buf = src_buf.slice(copied..);
322 78003048 : }
323 : // Then, if the buffer is full, flush it out
324 78003048 : if self.buf.len() == Self::CAPACITY {
325 64992 : if let Err(e) = self.flush_buffer(ctx).await {
326 0 : return (restore(src_buf), Err(e));
327 64992 : }
328 77938056 : }
329 : // Finally, write the tail of src_buf:
330 : // If it wholly fits into the buffer without
331 : // completely filling it, then put it there.
332 : // If not, write it out directly.
333 78003048 : let src_buf = if !src_buf.is_empty() {
334 64740 : assert_eq!(self.buf.len(), 0);
335 64740 : if src_buf.len() < Self::CAPACITY {
336 63360 : let copied = self.write_into_buffer(&src_buf);
337 63360 : // We just verified above that src_buf fits into our internal buffer.
338 63360 : assert_eq!(copied, src_buf.len());
339 63360 : restore(src_buf)
340 : } else {
341 1380 : let (src_buf, res) = self
342 1380 : .write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
343 1380 : .await;
344 1380 : if let Err(e) = res {
345 0 : return (src_buf, Err(e));
346 1380 : }
347 1380 : src_buf
348 : }
349 : } else {
350 77938308 : restore(src_buf)
351 : };
352 78003048 : (src_buf, Ok(()))
353 78621072 : }
354 :
355 : /// Write a blob of data. Returns the offset that it was written to,
356 : /// which can be used to retrieve the data later.
357 62088 : pub async fn write_blob<Buf: IoBuf + Send>(
358 62088 : &mut self,
359 62088 : srcbuf: FullSlice<Buf>,
360 62088 : ctx: &RequestContext,
361 62088 : ) -> (FullSlice<Buf>, Result<u64, Error>) {
362 62088 : let (buf, res) = self
363 62088 : .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
364 62088 : .await;
365 62088 : (buf, res.map(|(off, _compression_info)| off))
366 62088 : }
367 :
368 : /// Write a blob of data. Returns the offset that it was written to,
369 : /// which can be used to retrieve the data later.
370 39261384 : pub(crate) async fn write_blob_maybe_compressed<Buf: IoBuf + Send>(
371 39261384 : &mut self,
372 39261384 : srcbuf: FullSlice<Buf>,
373 39261384 : ctx: &RequestContext,
374 39261384 : algorithm: ImageCompressionAlgorithm,
375 39261384 : ) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
376 39261384 : let offset = self.offset;
377 39261384 : let mut compression_info = CompressionInfo {
378 39261384 : written_compressed: false,
379 39261384 : compressed_size: None,
380 39261384 : };
381 39261384 :
382 39261384 : let len = srcbuf.len();
383 39261384 :
384 39261384 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
385 39261384 : io_buf.clear();
386 39261384 : let mut compressed_buf = None;
387 39261384 : let ((io_buf_slice, hdr_res), srcbuf) = async {
388 39261384 : if len < 128 {
389 : // Short blob. Write a 1-byte length header
390 39043176 : io_buf.put_u8(len as u8);
391 39043176 : (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
392 : } else {
393 : // Write a 4-byte length header
394 218208 : if len > MAX_SUPPORTED_BLOB_LEN {
395 0 : return (
396 0 : (
397 0 : io_buf.slice_len(),
398 0 : Err(Error::other(format!("blob too large ({len} bytes)"))),
399 0 : ),
400 0 : srcbuf,
401 0 : );
402 218208 : }
403 218208 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
404 60312 : ImageCompressionAlgorithm::Zstd { level } => {
405 60312 : let mut encoder = if let Some(level) = level {
406 60312 : async_compression::tokio::write::ZstdEncoder::with_quality(
407 60312 : Vec::new(),
408 60312 : Level::Precise(level.into()),
409 60312 : )
410 : } else {
411 0 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
412 : };
413 60312 : encoder.write_all(&srcbuf[..]).await.unwrap();
414 60312 : encoder.shutdown().await.unwrap();
415 60312 : let compressed = encoder.into_inner();
416 60312 : compression_info.compressed_size = Some(compressed.len());
417 60312 : if compressed.len() < len {
418 36 : compression_info.written_compressed = true;
419 36 : let compressed_len = compressed.len();
420 36 : compressed_buf = Some(compressed);
421 36 : (BYTE_ZSTD, compressed_len, srcbuf)
422 : } else {
423 60276 : (BYTE_UNCOMPRESSED, len, srcbuf)
424 : }
425 : }
426 157896 : ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf),
427 : };
428 218208 : let mut len_buf = (len_written as u32).to_be_bytes();
429 218208 : assert_eq!(len_buf[0] & 0xf0, 0);
430 218208 : len_buf[0] |= high_bit_mask;
431 218208 : io_buf.extend_from_slice(&len_buf[..]);
432 218208 : (self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
433 : }
434 39261384 : }
435 39261384 : .await;
436 39261384 : self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner());
437 39261384 : match hdr_res {
438 39261384 : Ok(_) => (),
439 0 : Err(e) => return (srcbuf, Err(e)),
440 : }
441 39261384 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
442 36 : let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await;
443 36 : (srcbuf, res)
444 : } else {
445 39261348 : self.write_all(srcbuf, ctx).await
446 : };
447 39261384 : (srcbuf, res.map(|_| (offset, compression_info)))
448 39261384 : }
449 :
450 : /// Writes a raw blob containing both header and data, returning its offset.
451 98304 : pub(crate) async fn write_blob_raw<Buf: IoBuf + Send>(
452 98304 : &mut self,
453 98304 : raw_with_header: FullSlice<Buf>,
454 98304 : ctx: &RequestContext,
455 98304 : ) -> (FullSlice<Buf>, Result<u64, Error>) {
456 : // Verify the header, to ensure we don't write invalid/corrupt data.
457 98304 : let header = match Header::decode(&raw_with_header) {
458 98304 : Ok(header) => header,
459 0 : Err(err) => return (raw_with_header, Err(err)),
460 : };
461 98304 : if raw_with_header.len() != header.total_len() {
462 0 : let header_total_len = header.total_len();
463 0 : let raw_len = raw_with_header.len();
464 0 : return (
465 0 : raw_with_header,
466 0 : Err(std::io::Error::new(
467 0 : std::io::ErrorKind::InvalidData,
468 0 : format!("header length mismatch: {header_total_len} != {raw_len}"),
469 0 : )),
470 0 : );
471 98304 : }
472 98304 :
473 98304 : let offset = self.offset;
474 98304 : let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
475 98304 : (raw_with_header, result.map(|_| offset))
476 98304 : }
477 : }
478 :
479 : impl BlobWriter<true> {
480 : /// Access the underlying `VirtualFile`.
481 : ///
482 : /// This function flushes the internal buffer before giving access
483 : /// to the underlying `VirtualFile`.
484 8700 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
485 8700 : self.flush_buffer(ctx).await?;
486 8700 : Ok(self.inner)
487 8700 : }
488 :
489 : /// Access the underlying `VirtualFile`.
490 : ///
491 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
492 : /// the internal buffer before giving access.
493 156 : pub fn into_inner_no_flush(self) -> VirtualFile {
494 156 : self.inner
495 156 : }
496 : }
497 :
498 : impl BlobWriter<false> {
499 : /// Access the underlying `VirtualFile`.
500 3744 : pub fn into_inner(self) -> VirtualFile {
501 3744 : self.inner
502 3744 : }
503 : }
504 :
505 : #[cfg(test)]
506 : pub(crate) mod tests {
507 : use camino::Utf8PathBuf;
508 : use camino_tempfile::Utf8TempDir;
509 : use rand::{Rng, SeedableRng};
510 :
511 : use super::*;
512 : use crate::context::DownloadBehavior;
513 : use crate::task_mgr::TaskKind;
514 : use crate::tenant::block_io::BlockReaderRef;
515 :
516 144 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
517 144 : round_trip_test_compressed::<BUFFERED>(blobs, false).await
518 144 : }
519 :
520 240 : pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
521 240 : blobs: &[Vec<u8>],
522 240 : compression: bool,
523 240 : ctx: &RequestContext,
524 240 : ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
525 240 : let temp_dir = camino_tempfile::tempdir()?;
526 240 : let pathbuf = temp_dir.path().join("file");
527 240 : let gate = utils::sync::gate::Gate::default();
528 240 : let cancel = CancellationToken::new();
529 240 :
530 240 : // Write part (in block to drop the file)
531 240 : let mut offsets = Vec::new();
532 : {
533 240 : let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
534 240 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
535 74448 : for blob in blobs.iter() {
536 74448 : let (_, res) = if compression {
537 12600 : let res = wtr
538 12600 : .write_blob_maybe_compressed(
539 12600 : blob.clone().slice_len(),
540 12600 : ctx,
541 12600 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
542 12600 : )
543 12600 : .await;
544 12600 : (res.0, res.1.map(|(off, _)| off))
545 : } else {
546 61848 : wtr.write_blob(blob.clone().slice_len(), ctx).await
547 : };
548 74448 : let offs = res?;
549 74448 : offsets.push(offs);
550 : }
551 : // Write out one page worth of zeros so that we can
552 : // read again with read_blk
553 240 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
554 240 : let offs = res?;
555 240 : println!("Writing final blob at offs={offs}");
556 240 : wtr.flush_buffer(ctx).await?;
557 : }
558 240 : Ok((temp_dir, pathbuf, offsets))
559 240 : }
560 :
561 192 : async fn round_trip_test_compressed<const BUFFERED: bool>(
562 192 : blobs: &[Vec<u8>],
563 192 : compression: bool,
564 192 : ) -> Result<(), Error> {
565 192 : let ctx =
566 192 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
567 192 : let (_temp_dir, pathbuf, offsets) =
568 192 : write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
569 :
570 192 : let file = VirtualFile::open(pathbuf, &ctx).await?;
571 192 : let rdr = BlockReaderRef::VirtualFile(&file);
572 192 : let rdr = BlockCursor::new_with_compression(rdr, compression);
573 49728 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
574 49728 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
575 49728 : assert_eq!(
576 49728 : blob, &blob_read,
577 0 : "mismatch for idx={idx} at offset={offset}"
578 : );
579 : }
580 192 : Ok(())
581 192 : }
582 :
583 36948 : pub(crate) fn random_array(len: usize) -> Vec<u8> {
584 36948 : let mut rng = rand::thread_rng();
585 408911424 : (0..len).map(|_| rng.r#gen()).collect::<_>()
586 36948 : }
587 :
588 : #[tokio::test]
589 12 : async fn test_one() -> Result<(), Error> {
590 12 : let blobs = &[vec![12, 21, 22]];
591 12 : round_trip_test::<false>(blobs).await?;
592 12 : round_trip_test::<true>(blobs).await?;
593 12 : Ok(())
594 12 : }
595 :
596 : #[tokio::test]
597 12 : async fn test_hello_simple() -> Result<(), Error> {
598 12 : let blobs = &[
599 12 : vec![0, 1, 2, 3],
600 12 : b"Hello, World!".to_vec(),
601 12 : Vec::new(),
602 12 : b"foobar".to_vec(),
603 12 : ];
604 12 : round_trip_test::<false>(blobs).await?;
605 12 : round_trip_test::<true>(blobs).await?;
606 12 : round_trip_test_compressed::<false>(blobs, true).await?;
607 12 : round_trip_test_compressed::<true>(blobs, true).await?;
608 12 : Ok(())
609 12 : }
610 :
611 : #[tokio::test]
612 12 : async fn test_really_big_array() -> Result<(), Error> {
613 12 : let blobs = &[
614 12 : b"test".to_vec(),
615 12 : random_array(10 * PAGE_SZ),
616 12 : b"hello".to_vec(),
617 12 : random_array(66 * PAGE_SZ),
618 12 : vec![0xf3; 24 * PAGE_SZ],
619 12 : b"foobar".to_vec(),
620 12 : ];
621 12 : round_trip_test::<false>(blobs).await?;
622 12 : round_trip_test::<true>(blobs).await?;
623 12 : round_trip_test_compressed::<false>(blobs, true).await?;
624 12 : round_trip_test_compressed::<true>(blobs, true).await?;
625 12 : Ok(())
626 12 : }
627 :
628 : #[tokio::test]
629 12 : async fn test_arrays_inc() -> Result<(), Error> {
630 12 : let blobs = (0..PAGE_SZ / 8)
631 12288 : .map(|v| random_array(v * 16))
632 12 : .collect::<Vec<_>>();
633 12 : round_trip_test::<false>(&blobs).await?;
634 12 : round_trip_test::<true>(&blobs).await?;
635 12 : Ok(())
636 12 : }
637 :
638 : #[tokio::test]
639 12 : async fn test_arrays_random_size() -> Result<(), Error> {
640 12 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
641 12 : let blobs = (0..1024)
642 12288 : .map(|_| {
643 12288 : let mut sz: u16 = rng.r#gen();
644 12288 : // Make 50% of the arrays small
645 12288 : if rng.r#gen() {
646 6192 : sz &= 63;
647 6192 : }
648 12288 : random_array(sz.into())
649 12288 : })
650 12 : .collect::<Vec<_>>();
651 12 : round_trip_test::<false>(&blobs).await?;
652 12 : round_trip_test::<true>(&blobs).await?;
653 12 : Ok(())
654 12 : }
655 :
656 : #[tokio::test]
657 12 : async fn test_arrays_page_boundary() -> Result<(), Error> {
658 12 : let blobs = &[
659 12 : random_array(PAGE_SZ - 4),
660 12 : random_array(PAGE_SZ - 4),
661 12 : random_array(PAGE_SZ - 4),
662 12 : ];
663 12 : round_trip_test::<false>(blobs).await?;
664 12 : round_trip_test::<true>(blobs).await?;
665 12 : Ok(())
666 12 : }
667 : }
|