Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use async_compression::Level;
18 : use bytes::{BufMut, BytesMut};
19 : use pageserver_api::models::ImageCompressionAlgorithm;
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
22 : use tracing::warn;
23 :
24 : use crate::context::RequestContext;
25 : use crate::page_cache::PAGE_SZ;
26 : use crate::tenant::block_io::BlockCursor;
27 : use crate::virtual_file::VirtualFile;
28 : use std::cmp::min;
29 : use std::io::{Error, ErrorKind};
30 :
31 : impl<'a> BlockCursor<'a> {
32 : /// Read a blob into a new buffer.
33 2598620 : pub async fn read_blob(
34 2598620 : &self,
35 2598620 : offset: u64,
36 2598620 : ctx: &RequestContext,
37 2598620 : ) -> Result<Vec<u8>, std::io::Error> {
38 2598620 : let mut buf = Vec::new();
39 2598620 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
40 2598620 : Ok(buf)
41 2598620 : }
42 : /// Read blob into the given buffer. Any previous contents in the buffer
43 : /// are overwritten.
44 7106092 : pub async fn read_blob_into_buf(
45 7106092 : &self,
46 7106092 : offset: u64,
47 7106092 : dstbuf: &mut Vec<u8>,
48 7106092 : ctx: &RequestContext,
49 7106092 : ) -> Result<(), std::io::Error> {
50 7106092 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
51 7106092 : let mut off = (offset % PAGE_SZ as u64) as usize;
52 :
53 7106092 : let mut buf = self.read_blk(blknum, ctx).await?;
54 :
55 : // peek at the first byte, to determine if it's a 1- or 4-byte length
56 7106092 : let first_len_byte = buf[off];
57 7106092 : let len: usize = if first_len_byte < 0x80 {
58 : // 1-byte length header
59 7091270 : off += 1;
60 7091270 : first_len_byte as usize
61 : } else {
62 : // 4-byte length header
63 14822 : let mut len_buf = [0u8; 4];
64 14822 : let thislen = PAGE_SZ - off;
65 14822 : if thislen < 4 {
66 : // it is split across two pages
67 2 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
68 2 : blknum += 1;
69 2 : buf = self.read_blk(blknum, ctx).await?;
70 2 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
71 2 : off = 4 - thislen;
72 14820 : } else {
73 14820 : len_buf.copy_from_slice(&buf[off..off + 4]);
74 14820 : off += 4;
75 14820 : }
76 14822 : let bit_mask = if self.read_compressed {
77 12 : !LEN_COMPRESSION_BIT_MASK
78 : } else {
79 14810 : 0x7f
80 : };
81 14822 : len_buf[0] &= bit_mask;
82 14822 : u32::from_be_bytes(len_buf) as usize
83 : };
84 7106092 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
85 7106092 :
86 7106092 : let mut tmp_buf = Vec::new();
87 : let buf_to_write;
88 7106092 : let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
89 7106088 : if compression_bits > BYTE_UNCOMPRESSED {
90 0 : warn!("reading key above future limit ({len} bytes)");
91 7106088 : }
92 7106088 : buf_to_write = dstbuf;
93 7106088 : None
94 4 : } else if compression_bits == BYTE_ZSTD {
95 4 : buf_to_write = &mut tmp_buf;
96 4 : Some(dstbuf)
97 : } else {
98 0 : let error = std::io::Error::new(
99 0 : std::io::ErrorKind::InvalidData,
100 0 : format!("invalid compression byte {compression_bits:x}"),
101 0 : );
102 0 : return Err(error);
103 : };
104 :
105 7106092 : buf_to_write.clear();
106 7106092 : buf_to_write.reserve(len);
107 7106092 :
108 7106092 : // Read the payload
109 7106092 : let mut remain = len;
110 14295577 : while remain > 0 {
111 7189485 : let mut page_remain = PAGE_SZ - off;
112 7189485 : if page_remain == 0 {
113 : // continue on next page
114 84083 : blknum += 1;
115 84083 : buf = self.read_blk(blknum, ctx).await?;
116 84083 : off = 0;
117 84083 : page_remain = PAGE_SZ;
118 7105402 : }
119 7189485 : let this_blk_len = min(remain, page_remain);
120 7189485 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
121 7189485 : remain -= this_blk_len;
122 7189485 : off += this_blk_len;
123 : }
124 :
125 7106092 : if let Some(dstbuf) = compression {
126 4 : if compression_bits == BYTE_ZSTD {
127 4 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
128 4 : decoder.write_all(buf_to_write).await?;
129 4 : decoder.flush().await?;
130 : } else {
131 0 : unreachable!("already checked above")
132 : }
133 7106088 : }
134 :
135 7106092 : Ok(())
136 7106092 : }
137 : }
138 :
139 : /// Reserved bits for length and compression
140 : const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
141 :
142 : /// The maximum size of blobs we support. The highest few bits
143 : /// are reserved for compression and other further uses.
144 : const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
145 :
146 : const BYTE_UNCOMPRESSED: u8 = 0x80;
147 : const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
148 :
149 : /// A wrapper of `VirtualFile` that allows users to write blobs.
150 : ///
151 : /// If a `BlobWriter` is dropped, the internal buffer will be
152 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
153 : /// manually before dropping.
154 : pub struct BlobWriter<const BUFFERED: bool> {
155 : inner: VirtualFile,
156 : offset: u64,
157 : /// A buffer to save on write calls, only used if BUFFERED=true
158 : buf: Vec<u8>,
159 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
160 : io_buf: Option<BytesMut>,
161 : }
162 :
163 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
164 1604 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
165 1604 : Self {
166 1604 : inner,
167 1604 : offset: start_offset,
168 1604 : buf: Vec::with_capacity(Self::CAPACITY),
169 1604 : io_buf: Some(BytesMut::new()),
170 1604 : }
171 1604 : }
172 :
173 2025538 : pub fn size(&self) -> u64 {
174 2025538 : self.offset
175 2025538 : }
176 :
177 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
178 :
179 : /// Writes the given buffer directly to the underlying `VirtualFile`.
180 : /// You need to make sure that the internal buffer is empty, otherwise
181 : /// data will be written in wrong order.
182 : #[inline(always)]
183 1083980 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
184 1083980 : &mut self,
185 1083980 : src_buf: B,
186 1083980 : ctx: &RequestContext,
187 1083980 : ) -> (B::Buf, Result<(), Error>) {
188 1083980 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
189 1083980 : let nbytes = match res {
190 1083980 : Ok(nbytes) => nbytes,
191 0 : Err(e) => return (src_buf, Err(e)),
192 : };
193 1083980 : self.offset += nbytes as u64;
194 1083980 : (src_buf, Ok(()))
195 1083980 : }
196 :
197 : #[inline(always)]
198 : /// Flushes the internal buffer to the underlying `VirtualFile`.
199 10160 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
200 10160 : let buf = std::mem::take(&mut self.buf);
201 10160 : let (mut buf, res) = self.inner.write_all(buf, ctx).await;
202 10160 : res?;
203 10160 : buf.clear();
204 10160 : self.buf = buf;
205 10160 : Ok(())
206 10160 : }
207 :
208 : #[inline(always)]
209 : /// Writes as much of `src_buf` into the internal buffer as it fits
210 12922388 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
211 12922388 : let remaining = Self::CAPACITY - self.buf.len();
212 12922388 : let to_copy = src_buf.len().min(remaining);
213 12922388 : self.buf.extend_from_slice(&src_buf[..to_copy]);
214 12922388 : self.offset += to_copy as u64;
215 12922388 : to_copy
216 12922388 : }
217 :
218 : /// Internal, possibly buffered, write function
219 13997628 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
220 13997628 : &mut self,
221 13997628 : src_buf: B,
222 13997628 : ctx: &RequestContext,
223 13997628 : ) -> (B::Buf, Result<(), Error>) {
224 13997628 : if !BUFFERED {
225 1083756 : assert!(self.buf.is_empty());
226 1083756 : return self.write_all_unbuffered(src_buf, ctx).await;
227 12913872 : }
228 12913872 : let remaining = Self::CAPACITY - self.buf.len();
229 12913872 : let src_buf_len = src_buf.bytes_init();
230 12913872 : if src_buf_len == 0 {
231 20 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
232 12913852 : }
233 12913852 : let mut src_buf = src_buf.slice(0..src_buf_len);
234 12913852 : // First try to copy as much as we can into the buffer
235 12913852 : if remaining > 0 {
236 12913852 : let copied = self.write_into_buffer(&src_buf);
237 12913852 : src_buf = src_buf.slice(copied..);
238 12913852 : }
239 : // Then, if the buffer is full, flush it out
240 12913852 : if self.buf.len() == Self::CAPACITY {
241 8800 : if let Err(e) = self.flush_buffer(ctx).await {
242 0 : return (Slice::into_inner(src_buf), Err(e));
243 8800 : }
244 12905052 : }
245 : // Finally, write the tail of src_buf:
246 : // If it wholly fits into the buffer without
247 : // completely filling it, then put it there.
248 : // If not, write it out directly.
249 12913852 : let src_buf = if !src_buf.is_empty() {
250 8760 : assert_eq!(self.buf.len(), 0);
251 8760 : if src_buf.len() < Self::CAPACITY {
252 8536 : let copied = self.write_into_buffer(&src_buf);
253 8536 : // We just verified above that src_buf fits into our internal buffer.
254 8536 : assert_eq!(copied, src_buf.len());
255 8536 : Slice::into_inner(src_buf)
256 : } else {
257 224 : let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
258 224 : if let Err(e) = res {
259 0 : return (src_buf, Err(e));
260 224 : }
261 224 : src_buf
262 : }
263 : } else {
264 12905092 : Slice::into_inner(src_buf)
265 : };
266 12913852 : (src_buf, Ok(()))
267 13997628 : }
268 :
269 : /// Write a blob of data. Returns the offset that it was written to,
270 : /// which can be used to retrieve the data later.
271 545998 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
272 545998 : &mut self,
273 545998 : srcbuf: B,
274 545998 : ctx: &RequestContext,
275 545998 : ) -> (B::Buf, Result<u64, Error>) {
276 545998 : self.write_blob_maybe_compressed(
277 545998 : srcbuf,
278 545998 : ctx,
279 545998 : ImageCompressionAlgorithm::DisabledNoDecompress,
280 545998 : )
281 550796 : .await
282 545998 : }
283 :
284 : /// Write a blob of data. Returns the offset that it was written to,
285 : /// which can be used to retrieve the data later.
286 6998814 : pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
287 6998814 : &mut self,
288 6998814 : srcbuf: B,
289 6998814 : ctx: &RequestContext,
290 6998814 : algorithm: ImageCompressionAlgorithm,
291 6998814 : ) -> (B::Buf, Result<u64, Error>) {
292 6998814 : let offset = self.offset;
293 6998814 :
294 6998814 : let len = srcbuf.bytes_init();
295 6998814 :
296 6998814 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
297 6998814 : io_buf.clear();
298 6998814 : let mut compressed_buf = None;
299 6998814 : let ((io_buf, hdr_res), srcbuf) = async {
300 6998814 : if len < 128 {
301 6998814 : // Short blob. Write a 1-byte length header
302 6998814 : io_buf.put_u8(len as u8);
303 6986534 : (
304 6986534 : self.write_all(io_buf, ctx).await,
305 6998814 : srcbuf.slice_full().into_inner(),
306 6998814 : )
307 6998814 : } else {
308 6998814 : // Write a 4-byte length header
309 6998814 : if len > MAX_SUPPORTED_LEN {
310 6998814 : return (
311 0 : (
312 0 : io_buf,
313 0 : Err(Error::new(
314 0 : ErrorKind::Other,
315 0 : format!("blob too large ({len} bytes)"),
316 0 : )),
317 0 : ),
318 0 : srcbuf.slice_full().into_inner(),
319 0 : );
320 6998814 : }
321 6998814 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
322 6998814 : ImageCompressionAlgorithm::Zstd { level } => {
323 6998814 : let mut encoder = if let Some(level) = level {
324 6998814 : async_compression::tokio::write::ZstdEncoder::with_quality(
325 12 : Vec::new(),
326 12 : Level::Precise(level.into()),
327 12 : )
328 6998814 : } else {
329 6998814 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
330 6998814 : };
331 6998814 : let slice = srcbuf.slice_full();
332 12 : encoder.write_all(&slice[..]).await.unwrap();
333 12 : encoder.shutdown().await.unwrap();
334 12 : let compressed = encoder.into_inner();
335 12 : if compressed.len() < len {
336 6998814 : let compressed_len = compressed.len();
337 4 : compressed_buf = Some(compressed);
338 4 : (BYTE_ZSTD, compressed_len, slice.into_inner())
339 6998814 : } else {
340 6998814 : (BYTE_UNCOMPRESSED, len, slice.into_inner())
341 6998814 : }
342 6998814 : }
343 6998814 : ImageCompressionAlgorithm::Disabled
344 6998814 : | ImageCompressionAlgorithm::DisabledNoDecompress => {
345 6998814 : (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
346 6998814 : }
347 6998814 : };
348 6998814 : let mut len_buf = (len_written as u32).to_be_bytes();
349 12280 : assert_eq!(len_buf[0] & 0xf0, 0);
350 6998814 : len_buf[0] |= high_bit_mask;
351 12280 : io_buf.extend_from_slice(&len_buf[..]);
352 12280 : (self.write_all(io_buf, ctx).await, srcbuf)
353 6998814 : }
354 6998814 : }
355 276004 : .await;
356 6998814 : self.io_buf = Some(io_buf);
357 6998814 : match hdr_res {
358 6998814 : Ok(_) => (),
359 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
360 : }
361 6998814 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
362 4 : let (_buf, res) = self.write_all(compressed_buf, ctx).await;
363 4 : (Slice::into_inner(srcbuf.slice(..)), res)
364 : } else {
365 6998810 : self.write_all(srcbuf, ctx).await
366 : };
367 6998814 : (srcbuf, res.map(|_| offset))
368 6998814 : }
369 : }
370 :
371 : impl BlobWriter<true> {
372 : /// Access the underlying `VirtualFile`.
373 : ///
374 : /// This function flushes the internal buffer before giving access
375 : /// to the underlying `VirtualFile`.
376 1328 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
377 1328 : self.flush_buffer(ctx).await?;
378 1328 : Ok(self.inner)
379 1328 : }
380 :
381 : /// Access the underlying `VirtualFile`.
382 : ///
383 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
384 : /// the internal buffer before giving access.
385 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
386 0 : self.inner
387 0 : }
388 : }
389 :
390 : impl BlobWriter<false> {
391 : /// Access the underlying `VirtualFile`.
392 244 : pub fn into_inner(self) -> VirtualFile {
393 244 : self.inner
394 244 : }
395 : }
396 :
397 : #[cfg(test)]
398 : mod tests {
399 : use super::*;
400 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
401 : use rand::{Rng, SeedableRng};
402 :
403 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
404 15054 : round_trip_test_compressed::<BUFFERED>(blobs, false).await
405 24 : }
406 :
407 32 : async fn round_trip_test_compressed<const BUFFERED: bool>(
408 32 : blobs: &[Vec<u8>],
409 32 : compression: bool,
410 32 : ) -> Result<(), Error> {
411 32 : let temp_dir = camino_tempfile::tempdir()?;
412 32 : let pathbuf = temp_dir.path().join("file");
413 32 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
414 32 :
415 32 : // Write part (in block to drop the file)
416 32 : let mut offsets = Vec::new();
417 : {
418 32 : let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
419 32 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
420 8288 : for blob in blobs.iter() {
421 8288 : let (_, res) = if compression {
422 40 : wtr.write_blob_maybe_compressed(
423 40 : blob.clone(),
424 40 : &ctx,
425 40 : ImageCompressionAlgorithm::Zstd { level: Some(1) },
426 40 : )
427 22 : .await
428 : } else {
429 8248 : wtr.write_blob(blob.clone(), &ctx).await
430 : };
431 8288 : let offs = res?;
432 8288 : offsets.push(offs);
433 : }
434 : // Write out one page worth of zeros so that we can
435 : // read again with read_blk
436 32 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
437 32 : let offs = res?;
438 32 : println!("Writing final blob at offs={offs}");
439 32 : wtr.flush_buffer(&ctx).await?;
440 : }
441 :
442 32 : let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
443 32 : let rdr = BlockReaderRef::VirtualFile(&file);
444 32 : let rdr = BlockCursor::new_with_compression(rdr, compression);
445 8288 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
446 10619 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
447 8288 : assert_eq!(
448 8288 : blob, &blob_read,
449 0 : "mismatch for idx={idx} at offset={offset}"
450 : );
451 : }
452 32 : Ok(())
453 32 : }
454 :
455 4106 : fn random_array(len: usize) -> Vec<u8> {
456 4106 : let mut rng = rand::thread_rng();
457 50145888 : (0..len).map(|_| rng.gen()).collect::<_>()
458 4106 : }
459 :
460 : #[tokio::test]
461 2 : async fn test_one() -> Result<(), Error> {
462 2 : let blobs = &[vec![12, 21, 22]];
463 8 : round_trip_test::<false>(blobs).await?;
464 4 : round_trip_test::<true>(blobs).await?;
465 2 : Ok(())
466 2 : }
467 :
468 : #[tokio::test]
469 2 : async fn test_hello_simple() -> Result<(), Error> {
470 2 : let blobs = &[
471 2 : vec![0, 1, 2, 3],
472 2 : b"Hello, World!".to_vec(),
473 2 : Vec::new(),
474 2 : b"foobar".to_vec(),
475 2 : ];
476 16 : round_trip_test::<false>(blobs).await?;
477 7 : round_trip_test::<true>(blobs).await?;
478 15 : round_trip_test_compressed::<false>(blobs, true).await?;
479 7 : round_trip_test_compressed::<true>(blobs, true).await?;
480 2 : Ok(())
481 2 : }
482 :
483 : #[tokio::test]
484 2 : async fn test_really_big_array() -> Result<(), Error> {
485 2 : let blobs = &[
486 2 : b"test".to_vec(),
487 2 : random_array(10 * PAGE_SZ),
488 2 : b"hello".to_vec(),
489 2 : random_array(66 * PAGE_SZ),
490 2 : vec![0xf3; 24 * PAGE_SZ],
491 2 : b"foobar".to_vec(),
492 2 : ];
493 124 : round_trip_test::<false>(blobs).await?;
494 116 : round_trip_test::<true>(blobs).await?;
495 100 : round_trip_test_compressed::<false>(blobs, true).await?;
496 89 : round_trip_test_compressed::<true>(blobs, true).await?;
497 2 : Ok(())
498 2 : }
499 :
500 : #[tokio::test]
501 2 : async fn test_arrays_inc() -> Result<(), Error> {
502 2 : let blobs = (0..PAGE_SZ / 8)
503 2048 : .map(|v| random_array(v * 16))
504 2 : .collect::<Vec<_>>();
505 4162 : round_trip_test::<false>(&blobs).await?;
506 2212 : round_trip_test::<true>(&blobs).await?;
507 2 : Ok(())
508 2 : }
509 :
510 : #[tokio::test]
511 2 : async fn test_arrays_random_size() -> Result<(), Error> {
512 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
513 2 : let blobs = (0..1024)
514 2048 : .map(|_| {
515 2048 : let mut sz: u16 = rng.gen();
516 2048 : // Make 50% of the arrays small
517 2048 : if rng.gen() {
518 1032 : sz &= 63;
519 1032 : }
520 2048 : random_array(sz.into())
521 2048 : })
522 2 : .collect::<Vec<_>>();
523 5106 : round_trip_test::<false>(&blobs).await?;
524 3279 : round_trip_test::<true>(&blobs).await?;
525 2 : Ok(())
526 2 : }
527 :
528 : #[tokio::test]
529 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
530 2 : let blobs = &[
531 2 : random_array(PAGE_SZ - 4),
532 2 : random_array(PAGE_SZ - 4),
533 2 : random_array(PAGE_SZ - 4),
534 2 : ];
535 14 : round_trip_test::<false>(blobs).await?;
536 6 : round_trip_test::<true>(blobs).await?;
537 2 : Ok(())
538 2 : }
539 : }
|