Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte. For blobs larger than 128 bits,
10 : //! we also specify three reserved bits, only one of the three bit
11 : //! patterns is currently in use (0b011) and signifies compression
12 : //! with zstd.
13 : //!
14 : //! len < 128: 0XXXXXXX
15 : //! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
16 : //!
17 : use async_compression::Level;
18 : use bytes::{BufMut, BytesMut};
19 : use pageserver_api::models::ImageCompressionAlgorithm;
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
22 :
23 : use crate::context::RequestContext;
24 : use crate::page_cache::PAGE_SZ;
25 : use crate::tenant::block_io::BlockCursor;
26 : use crate::virtual_file::VirtualFile;
27 : use std::cmp::min;
28 : use std::io::{Error, ErrorKind};
29 :
30 : impl<'a> BlockCursor<'a> {
31 : /// Read a blob into a new buffer.
32 2598626 : pub async fn read_blob(
33 2598626 : &self,
34 2598626 : offset: u64,
35 2598626 : ctx: &RequestContext,
36 2598626 : ) -> Result<Vec<u8>, std::io::Error> {
37 2598626 : let mut buf = Vec::new();
38 2598626 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
39 2598626 : Ok(buf)
40 2598626 : }
41 : /// Read blob into the given buffer. Any previous contents in the buffer
42 : /// are overwritten.
43 7106068 : pub async fn read_blob_into_buf(
44 7106068 : &self,
45 7106068 : offset: u64,
46 7106068 : dstbuf: &mut Vec<u8>,
47 7106068 : ctx: &RequestContext,
48 7106068 : ) -> Result<(), std::io::Error> {
49 7106068 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
50 7106068 : let mut off = (offset % PAGE_SZ as u64) as usize;
51 :
52 7106068 : let mut buf = self.read_blk(blknum, ctx).await?;
53 :
54 : // peek at the first byte, to determine if it's a 1- or 4-byte length
55 7106068 : let first_len_byte = buf[off];
56 7106068 : let len: usize = if first_len_byte < 0x80 {
57 : // 1-byte length header
58 7091246 : off += 1;
59 7091246 : first_len_byte as usize
60 : } else {
61 : // 4-byte length header
62 14822 : let mut len_buf = [0u8; 4];
63 14822 : let thislen = PAGE_SZ - off;
64 14822 : if thislen < 4 {
65 : // it is split across two pages
66 3 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
67 3 : blknum += 1;
68 3 : buf = self.read_blk(blknum, ctx).await?;
69 3 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
70 3 : off = 4 - thislen;
71 14819 : } else {
72 14819 : len_buf.copy_from_slice(&buf[off..off + 4]);
73 14819 : off += 4;
74 14819 : }
75 14822 : len_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
76 14822 : u32::from_be_bytes(len_buf) as usize
77 : };
78 7106068 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
79 7106068 :
80 7106068 : let mut tmp_buf = Vec::new();
81 : let buf_to_write;
82 7106068 : let compression = if compression_bits <= BYTE_UNCOMPRESSED {
83 7106064 : buf_to_write = dstbuf;
84 7106064 : None
85 4 : } else if compression_bits == BYTE_ZSTD {
86 4 : buf_to_write = &mut tmp_buf;
87 4 : Some(dstbuf)
88 : } else {
89 0 : let error = std::io::Error::new(
90 0 : std::io::ErrorKind::InvalidData,
91 0 : format!("invalid compression byte {compression_bits:x}"),
92 0 : );
93 0 : return Err(error);
94 : };
95 :
96 7106068 : buf_to_write.clear();
97 7106068 : buf_to_write.reserve(len);
98 7106068 :
99 7106068 : // Read the payload
100 7106068 : let mut remain = len;
101 14295497 : while remain > 0 {
102 7189429 : let mut page_remain = PAGE_SZ - off;
103 7189429 : if page_remain == 0 {
104 : // continue on next page
105 84040 : blknum += 1;
106 84040 : buf = self.read_blk(blknum, ctx).await?;
107 84040 : off = 0;
108 84040 : page_remain = PAGE_SZ;
109 7105389 : }
110 7189429 : let this_blk_len = min(remain, page_remain);
111 7189429 : buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]);
112 7189429 : remain -= this_blk_len;
113 7189429 : off += this_blk_len;
114 : }
115 :
116 7106068 : if let Some(dstbuf) = compression {
117 4 : if compression_bits == BYTE_ZSTD {
118 4 : let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf);
119 4 : decoder.write_all(buf_to_write).await?;
120 4 : decoder.flush().await?;
121 : } else {
122 0 : unreachable!("already checked above")
123 : }
124 7106064 : }
125 :
126 7106068 : Ok(())
127 7106068 : }
128 : }
129 :
130 : /// Reserved bits for length and compression
131 : const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
132 :
133 : /// The maximum size of blobs we support. The highest few bits
134 : /// are reserved for compression and other further uses.
135 : const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;
136 :
137 : const BYTE_UNCOMPRESSED: u8 = 0x80;
138 : const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
139 :
140 : /// A wrapper of `VirtualFile` that allows users to write blobs.
141 : ///
142 : /// If a `BlobWriter` is dropped, the internal buffer will be
143 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
144 : /// manually before dropping.
145 : pub struct BlobWriter<const BUFFERED: bool> {
146 : inner: VirtualFile,
147 : offset: u64,
148 : /// A buffer to save on write calls, only used if BUFFERED=true
149 : buf: Vec<u8>,
150 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
151 : io_buf: Option<BytesMut>,
152 : }
153 :
154 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
155 1600 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
156 1600 : Self {
157 1600 : inner,
158 1600 : offset: start_offset,
159 1600 : buf: Vec::with_capacity(Self::CAPACITY),
160 1600 : io_buf: Some(BytesMut::new()),
161 1600 : }
162 1600 : }
163 :
164 2025538 : pub fn size(&self) -> u64 {
165 2025538 : self.offset
166 2025538 : }
167 :
168 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
169 :
170 : /// Writes the given buffer directly to the underlying `VirtualFile`.
171 : /// You need to make sure that the internal buffer is empty, otherwise
172 : /// data will be written in wrong order.
173 : #[inline(always)]
174 1083960 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
175 1083960 : &mut self,
176 1083960 : src_buf: B,
177 1083960 : ctx: &RequestContext,
178 1083960 : ) -> (B::Buf, Result<(), Error>) {
179 1083960 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
180 1083960 : let nbytes = match res {
181 1083960 : Ok(nbytes) => nbytes,
182 0 : Err(e) => return (src_buf, Err(e)),
183 : };
184 1083960 : self.offset += nbytes as u64;
185 1083960 : (src_buf, Ok(()))
186 1083960 : }
187 :
188 : #[inline(always)]
189 : /// Flushes the internal buffer to the underlying `VirtualFile`.
190 10156 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
191 10156 : let buf = std::mem::take(&mut self.buf);
192 10156 : let (mut buf, res) = self.inner.write_all(buf, ctx).await;
193 10156 : res?;
194 10156 : buf.clear();
195 10156 : self.buf = buf;
196 10156 : Ok(())
197 10156 : }
198 :
199 : #[inline(always)]
200 : /// Writes as much of `src_buf` into the internal buffer as it fits
201 12922370 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
202 12922370 : let remaining = Self::CAPACITY - self.buf.len();
203 12922370 : let to_copy = src_buf.len().min(remaining);
204 12922370 : self.buf.extend_from_slice(&src_buf[..to_copy]);
205 12922370 : self.offset += to_copy as u64;
206 12922370 : to_copy
207 12922370 : }
208 :
209 : /// Internal, possibly buffered, write function
210 13997588 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
211 13997588 : &mut self,
212 13997588 : src_buf: B,
213 13997588 : ctx: &RequestContext,
214 13997588 : ) -> (B::Buf, Result<(), Error>) {
215 13997588 : if !BUFFERED {
216 1083736 : assert!(self.buf.is_empty());
217 1083736 : return self.write_all_unbuffered(src_buf, ctx).await;
218 12913852 : }
219 12913852 : let remaining = Self::CAPACITY - self.buf.len();
220 12913852 : let src_buf_len = src_buf.bytes_init();
221 12913852 : if src_buf_len == 0 {
222 18 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
223 12913834 : }
224 12913834 : let mut src_buf = src_buf.slice(0..src_buf_len);
225 12913834 : // First try to copy as much as we can into the buffer
226 12913834 : if remaining > 0 {
227 12913834 : let copied = self.write_into_buffer(&src_buf);
228 12913834 : src_buf = src_buf.slice(copied..);
229 12913834 : }
230 : // Then, if the buffer is full, flush it out
231 12913834 : if self.buf.len() == Self::CAPACITY {
232 8800 : if let Err(e) = self.flush_buffer(ctx).await {
233 0 : return (Slice::into_inner(src_buf), Err(e));
234 8800 : }
235 12905034 : }
236 : // Finally, write the tail of src_buf:
237 : // If it wholly fits into the buffer without
238 : // completely filling it, then put it there.
239 : // If not, write it out directly.
240 12913834 : let src_buf = if !src_buf.is_empty() {
241 8760 : assert_eq!(self.buf.len(), 0);
242 8760 : if src_buf.len() < Self::CAPACITY {
243 8536 : let copied = self.write_into_buffer(&src_buf);
244 8536 : // We just verified above that src_buf fits into our internal buffer.
245 8536 : assert_eq!(copied, src_buf.len());
246 8536 : Slice::into_inner(src_buf)
247 : } else {
248 224 : let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
249 224 : if let Err(e) = res {
250 0 : return (src_buf, Err(e));
251 224 : }
252 224 : src_buf
253 : }
254 : } else {
255 12905074 : Slice::into_inner(src_buf)
256 : };
257 12913834 : (src_buf, Ok(()))
258 13997588 : }
259 :
260 : /// Write a blob of data. Returns the offset that it was written to,
261 : /// which can be used to retrieve the data later.
262 545994 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
263 545994 : &mut self,
264 545994 : srcbuf: B,
265 545994 : ctx: &RequestContext,
266 545994 : ) -> (B::Buf, Result<u64, Error>) {
267 550802 : self.write_blob_maybe_compressed(srcbuf, ctx, None).await
268 545994 : }
269 :
270 : /// Write a blob of data. Returns the offset that it was written to,
271 : /// which can be used to retrieve the data later.
272 6998794 : pub async fn write_blob_maybe_compressed<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
273 6998794 : &mut self,
274 6998794 : srcbuf: B,
275 6998794 : ctx: &RequestContext,
276 6998794 : algorithm: Option<ImageCompressionAlgorithm>,
277 6998794 : ) -> (B::Buf, Result<u64, Error>) {
278 6998794 : let offset = self.offset;
279 6998794 :
280 6998794 : let len = srcbuf.bytes_init();
281 6998794 :
282 6998794 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
283 6998794 : io_buf.clear();
284 6998794 : let mut compressed_buf = None;
285 6998794 : let ((io_buf, hdr_res), srcbuf) = async {
286 6998794 : if len < 128 {
287 6998794 : // Short blob. Write a 1-byte length header
288 6998794 : io_buf.put_u8(len as u8);
289 6986518 : (
290 6986518 : self.write_all(io_buf, ctx).await,
291 6998794 : srcbuf.slice_full().into_inner(),
292 6998794 : )
293 6998794 : } else {
294 6998794 : // Write a 4-byte length header
295 6998794 : if len > MAX_SUPPORTED_LEN {
296 6998794 : return (
297 0 : (
298 0 : io_buf,
299 0 : Err(Error::new(
300 0 : ErrorKind::Other,
301 0 : format!("blob too large ({len} bytes)"),
302 0 : )),
303 0 : ),
304 0 : srcbuf.slice_full().into_inner(),
305 0 : );
306 6998794 : }
307 6998794 : let (high_bit_mask, len_written, srcbuf) = match algorithm {
308 6998794 : Some(ImageCompressionAlgorithm::Zstd { level }) => {
309 6998794 : let mut encoder = if let Some(level) = level {
310 6998794 : async_compression::tokio::write::ZstdEncoder::with_quality(
311 12 : Vec::new(),
312 12 : Level::Precise(level.into()),
313 12 : )
314 6998794 : } else {
315 6998794 : async_compression::tokio::write::ZstdEncoder::new(Vec::new())
316 6998794 : };
317 6998794 : let slice = srcbuf.slice_full();
318 12 : encoder.write_all(&slice[..]).await.unwrap();
319 12 : encoder.shutdown().await.unwrap();
320 12 : let compressed = encoder.into_inner();
321 12 : if compressed.len() < len {
322 6998794 : let compressed_len = compressed.len();
323 4 : compressed_buf = Some(compressed);
324 4 : (BYTE_ZSTD, compressed_len, slice.into_inner())
325 6998794 : } else {
326 6998794 : (BYTE_UNCOMPRESSED, len, slice.into_inner())
327 6998794 : }
328 6998794 : }
329 6998794 : None => (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()),
330 6998794 : };
331 6998794 : let mut len_buf = (len_written as u32).to_be_bytes();
332 12276 : assert_eq!(len_buf[0] & 0xf0, 0);
333 6998794 : len_buf[0] |= high_bit_mask;
334 12276 : io_buf.extend_from_slice(&len_buf[..]);
335 12276 : (self.write_all(io_buf, ctx).await, srcbuf)
336 6998794 : }
337 6998794 : }
338 276066 : .await;
339 6998794 : self.io_buf = Some(io_buf);
340 6998794 : match hdr_res {
341 6998794 : Ok(_) => (),
342 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
343 : }
344 6998794 : let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
345 4 : let (_buf, res) = self.write_all(compressed_buf, ctx).await;
346 4 : (Slice::into_inner(srcbuf.slice(..)), res)
347 : } else {
348 6998790 : self.write_all(srcbuf, ctx).await
349 : };
350 6998794 : (srcbuf, res.map(|_| offset))
351 6998794 : }
352 : }
353 :
354 : impl BlobWriter<true> {
355 : /// Access the underlying `VirtualFile`.
356 : ///
357 : /// This function flushes the internal buffer before giving access
358 : /// to the underlying `VirtualFile`.
359 1328 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
360 1328 : self.flush_buffer(ctx).await?;
361 1328 : Ok(self.inner)
362 1328 : }
363 :
364 : /// Access the underlying `VirtualFile`.
365 : ///
366 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
367 : /// the internal buffer before giving access.
368 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
369 0 : self.inner
370 0 : }
371 : }
372 :
373 : impl BlobWriter<false> {
374 : /// Access the underlying `VirtualFile`.
375 244 : pub fn into_inner(self) -> VirtualFile {
376 244 : self.inner
377 244 : }
378 : }
379 :
380 : #[cfg(test)]
381 : mod tests {
382 : use super::*;
383 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
384 : use rand::{Rng, SeedableRng};
385 :
386 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
387 15054 : round_trip_test_compressed::<BUFFERED, 0>(blobs).await
388 24 : }
389 :
390 28 : async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
391 28 : blobs: &[Vec<u8>],
392 28 : ) -> Result<(), Error> {
393 28 : let temp_dir = camino_tempfile::tempdir()?;
394 28 : let pathbuf = temp_dir.path().join("file");
395 28 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
396 28 :
397 28 : // Write part (in block to drop the file)
398 28 : let mut offsets = Vec::new();
399 : {
400 28 : let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
401 28 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
402 8272 : for blob in blobs.iter() {
403 8272 : let (_, res) = match COMPRESSION {
404 8248 : 0 => wtr.write_blob(blob.clone(), &ctx).await,
405 : 1 => {
406 24 : wtr.write_blob_maybe_compressed(
407 24 : blob.clone(),
408 24 : &ctx,
409 24 : Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
410 24 : )
411 15 : .await
412 : }
413 0 : _ => unreachable!("Invalid compression {COMPRESSION}"),
414 : };
415 8272 : let offs = res?;
416 8272 : offsets.push(offs);
417 : }
418 : // Write out one page worth of zeros so that we can
419 : // read again with read_blk
420 28 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
421 28 : let offs = res?;
422 28 : println!("Writing final blob at offs={offs}");
423 28 : wtr.flush_buffer(&ctx).await?;
424 : }
425 :
426 28 : let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
427 28 : let rdr = BlockReaderRef::VirtualFile(&file);
428 28 : let rdr = BlockCursor::new(rdr);
429 8272 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
430 10611 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
431 8272 : assert_eq!(
432 8272 : blob, &blob_read,
433 0 : "mismatch for idx={idx} at offset={offset}"
434 : );
435 : }
436 28 : Ok(())
437 28 : }
438 :
439 4106 : fn random_array(len: usize) -> Vec<u8> {
440 4106 : let mut rng = rand::thread_rng();
441 50145888 : (0..len).map(|_| rng.gen()).collect::<_>()
442 4106 : }
443 :
444 : #[tokio::test]
445 2 : async fn test_one() -> Result<(), Error> {
446 2 : let blobs = &[vec![12, 21, 22]];
447 8 : round_trip_test::<false>(blobs).await?;
448 4 : round_trip_test::<true>(blobs).await?;
449 2 : Ok(())
450 2 : }
451 :
452 : #[tokio::test]
453 2 : async fn test_hello_simple() -> Result<(), Error> {
454 2 : let blobs = &[
455 2 : vec![0, 1, 2, 3],
456 2 : b"Hello, World!".to_vec(),
457 2 : Vec::new(),
458 2 : b"foobar".to_vec(),
459 2 : ];
460 16 : round_trip_test::<false>(blobs).await?;
461 7 : round_trip_test::<true>(blobs).await?;
462 2 : Ok(())
463 2 : }
464 :
465 : #[tokio::test]
466 2 : async fn test_really_big_array() -> Result<(), Error> {
467 2 : let blobs = &[
468 2 : b"test".to_vec(),
469 2 : random_array(10 * PAGE_SZ),
470 2 : b"hello".to_vec(),
471 2 : random_array(66 * PAGE_SZ),
472 2 : vec![0xf3; 24 * PAGE_SZ],
473 2 : b"foobar".to_vec(),
474 2 : ];
475 124 : round_trip_test::<false>(blobs).await?;
476 116 : round_trip_test::<true>(blobs).await?;
477 100 : round_trip_test_compressed::<false, 1>(blobs).await?;
478 89 : round_trip_test_compressed::<true, 1>(blobs).await?;
479 2 : Ok(())
480 2 : }
481 :
482 : #[tokio::test]
483 2 : async fn test_arrays_inc() -> Result<(), Error> {
484 2 : let blobs = (0..PAGE_SZ / 8)
485 2048 : .map(|v| random_array(v * 16))
486 2 : .collect::<Vec<_>>();
487 4162 : round_trip_test::<false>(&blobs).await?;
488 2212 : round_trip_test::<true>(&blobs).await?;
489 2 : Ok(())
490 2 : }
491 :
492 : #[tokio::test]
493 2 : async fn test_arrays_random_size() -> Result<(), Error> {
494 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
495 2 : let blobs = (0..1024)
496 2048 : .map(|_| {
497 2048 : let mut sz: u16 = rng.gen();
498 2048 : // Make 50% of the arrays small
499 2048 : if rng.gen() {
500 1032 : sz &= 63;
501 1032 : }
502 2048 : random_array(sz.into())
503 2048 : })
504 2 : .collect::<Vec<_>>();
505 5106 : round_trip_test::<false>(&blobs).await?;
506 3279 : round_trip_test::<true>(&blobs).await?;
507 2 : Ok(())
508 2 : }
509 :
510 : #[tokio::test]
511 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
512 2 : let blobs = &[
513 2 : random_array(PAGE_SZ - 4),
514 2 : random_array(PAGE_SZ - 4),
515 2 : random_array(PAGE_SZ - 4),
516 2 : ];
517 14 : round_trip_test::<false>(blobs).await?;
518 6 : round_trip_test::<true>(blobs).await?;
519 2 : Ok(())
520 2 : }
521 : }
|