LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: f081ec316c96fa98335efd15ef501745aa4f015d.info Lines: 95.0 % 279 265
Test Date: 2024-06-25 15:11:17 Functions: 84.6 % 78 66

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte.
      10              : //!
      11              : //! len <  128: 0XXXXXXX
      12              : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      13              : //!
      14              : use bytes::{BufMut, BytesMut};
      15              : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
      16              : 
      17              : use crate::context::RequestContext;
      18              : use crate::page_cache::PAGE_SZ;
      19              : use crate::tenant::block_io::BlockCursor;
      20              : use crate::virtual_file::VirtualFile;
      21              : use std::cmp::min;
      22              : use std::io::{Error, ErrorKind};
      23              : 
      24              : impl<'a> BlockCursor<'a> {
      25              :     /// Read a blob into a new buffer.
      26      2598590 :     pub async fn read_blob(
      27      2598590 :         &self,
      28      2598590 :         offset: u64,
      29      2598590 :         ctx: &RequestContext,
      30      2598590 :     ) -> Result<Vec<u8>, std::io::Error> {
      31      2598590 :         let mut buf = Vec::new();
      32      2598590 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
      33      2598590 :         Ok(buf)
      34      2598590 :     }
      35              :     /// Read blob into the given buffer. Any previous contents in the buffer
      36              :     /// are overwritten.
      37      7105920 :     pub async fn read_blob_into_buf(
      38      7105920 :         &self,
      39      7105920 :         offset: u64,
      40      7105920 :         dstbuf: &mut Vec<u8>,
      41      7105920 :         ctx: &RequestContext,
      42      7105920 :     ) -> Result<(), std::io::Error> {
      43      7105920 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      44      7105920 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      45              : 
      46      7105920 :         let mut buf = self.read_blk(blknum, ctx).await?;
      47              : 
      48              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      49      7105920 :         let first_len_byte = buf[off];
      50      7105920 :         let len: usize = if first_len_byte < 0x80 {
      51              :             // 1-byte length header
      52      7091118 :             off += 1;
      53      7091118 :             first_len_byte as usize
      54              :         } else {
      55              :             // 4-byte length header
      56        14802 :             let mut len_buf = [0u8; 4];
      57        14802 :             let thislen = PAGE_SZ - off;
      58        14802 :             if thislen < 4 {
      59              :                 // it is split across two pages
      60            4 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      61            4 :                 blknum += 1;
      62            4 :                 buf = self.read_blk(blknum, ctx).await?;
      63            4 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      64            4 :                 off = 4 - thislen;
      65        14798 :             } else {
      66        14798 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      67        14798 :                 off += 4;
      68        14798 :             }
      69        14802 :             len_buf[0] &= 0x7f;
      70        14802 :             u32::from_be_bytes(len_buf) as usize
      71              :         };
      72              : 
      73      7105920 :         dstbuf.clear();
      74      7105920 :         dstbuf.reserve(len);
      75      7105920 : 
      76      7105920 :         // Read the payload
      77      7105920 :         let mut remain = len;
      78     14294569 :         while remain > 0 {
      79      7188649 :             let mut page_remain = PAGE_SZ - off;
      80      7188649 :             if page_remain == 0 {
      81              :                 // continue on next page
      82        83414 :                 blknum += 1;
      83        83414 :                 buf = self.read_blk(blknum, ctx).await?;
      84        83414 :                 off = 0;
      85        83414 :                 page_remain = PAGE_SZ;
      86      7105235 :             }
      87      7188649 :             let this_blk_len = min(remain, page_remain);
      88      7188649 :             dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
      89      7188649 :             remain -= this_blk_len;
      90      7188649 :             off += this_blk_len;
      91              :         }
      92      7105920 :         Ok(())
      93      7105920 :     }
      94              : }
      95              : 
      96              : /// A wrapper of `VirtualFile` that allows users to write blobs.
      97              : ///
      98              : /// If a `BlobWriter` is dropped, the internal buffer will be
      99              : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
     100              : /// manually before dropping.
     101              : pub struct BlobWriter<const BUFFERED: bool> {
     102              :     inner: VirtualFile,
     103              :     offset: u64,
     104              :     /// A buffer to save on write calls, only used if BUFFERED=true
     105              :     buf: Vec<u8>,
     106              :     /// We do tiny writes for the length headers; they need to be in an owned buffer;
     107              :     io_buf: Option<BytesMut>,
     108              : }
     109              : 
     110              : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     111         1588 :     pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
     112         1588 :         Self {
     113         1588 :             inner,
     114         1588 :             offset: start_offset,
     115         1588 :             buf: Vec::with_capacity(Self::CAPACITY),
     116         1588 :             io_buf: Some(BytesMut::new()),
     117         1588 :         }
     118         1588 :     }
     119              : 
     120      2025530 :     pub fn size(&self) -> u64 {
     121      2025530 :         self.offset
     122      2025530 :     }
     123              : 
     124              :     const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
     125              : 
     126              :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     127              :     /// You need to make sure that the internal buffer is empty, otherwise
     128              :     /// data will be written in wrong order.
     129              :     #[inline(always)]
     130      1079850 :     async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     131      1079850 :         &mut self,
     132      1079850 :         src_buf: B,
     133      1079850 :         ctx: &RequestContext,
     134      1079850 :     ) -> (B::Buf, Result<(), Error>) {
     135      1079850 :         let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
     136      1079850 :         let nbytes = match res {
     137      1079850 :             Ok(nbytes) => nbytes,
     138            0 :             Err(e) => return (src_buf, Err(e)),
     139              :         };
     140      1079850 :         self.offset += nbytes as u64;
     141      1079850 :         (src_buf, Ok(()))
     142      1079850 :     }
     143              : 
     144              :     #[inline(always)]
     145              :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     146        10142 :     pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
     147        10142 :         let buf = std::mem::take(&mut self.buf);
     148        10142 :         let (mut buf, res) = self.inner.write_all(buf, ctx).await;
     149        10142 :         res?;
     150        10142 :         buf.clear();
     151        10142 :         self.buf = buf;
     152        10142 :         Ok(())
     153        10142 :     }
     154              : 
     155              :     #[inline(always)]
     156              :     /// Writes as much of `src_buf` into the internal buffer as it fits
     157     12918320 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     158     12918320 :         let remaining = Self::CAPACITY - self.buf.len();
     159     12918320 :         let to_copy = src_buf.len().min(remaining);
     160     12918320 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     161     12918320 :         self.offset += to_copy as u64;
     162     12918320 :         to_copy
     163     12918320 :     }
     164              : 
     165              :     /// Internal, possibly buffered, write function
     166     13989436 :     async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     167     13989436 :         &mut self,
     168     13989436 :         src_buf: B,
     169     13989436 :         ctx: &RequestContext,
     170     13989436 :     ) -> (B::Buf, Result<(), Error>) {
     171     13989436 :         if !BUFFERED {
     172      1079632 :             assert!(self.buf.is_empty());
     173      1079632 :             return self.write_all_unbuffered(src_buf, ctx).await;
     174     12909804 :         }
     175     12909804 :         let remaining = Self::CAPACITY - self.buf.len();
     176     12909804 :         let src_buf_len = src_buf.bytes_init();
     177     12909804 :         if src_buf_len == 0 {
     178           18 :             return (Slice::into_inner(src_buf.slice_full()), Ok(()));
     179     12909786 :         }
     180     12909786 :         let mut src_buf = src_buf.slice(0..src_buf_len);
     181     12909786 :         // First try to copy as much as we can into the buffer
     182     12909786 :         if remaining > 0 {
     183     12909786 :             let copied = self.write_into_buffer(&src_buf);
     184     12909786 :             src_buf = src_buf.slice(copied..);
     185     12909786 :         }
     186              :         // Then, if the buffer is full, flush it out
     187     12909786 :         if self.buf.len() == Self::CAPACITY {
     188         8792 :             if let Err(e) = self.flush_buffer(ctx).await {
     189            0 :                 return (Slice::into_inner(src_buf), Err(e));
     190         8792 :             }
     191     12900994 :         }
     192              :         // Finally, write the tail of src_buf:
     193              :         // If it wholly fits into the buffer without
     194              :         // completely filling it, then put it there.
     195              :         // If not, write it out directly.
     196     12909786 :         let src_buf = if !src_buf.is_empty() {
     197         8752 :             assert_eq!(self.buf.len(), 0);
     198         8752 :             if src_buf.len() < Self::CAPACITY {
     199         8534 :                 let copied = self.write_into_buffer(&src_buf);
     200         8534 :                 // We just verified above that src_buf fits into our internal buffer.
     201         8534 :                 assert_eq!(copied, src_buf.len());
     202         8534 :                 Slice::into_inner(src_buf)
     203              :             } else {
     204          218 :                 let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
     205          218 :                 if let Err(e) = res {
     206            0 :                     return (src_buf, Err(e));
     207          218 :                 }
     208          218 :                 src_buf
     209              :             }
     210              :         } else {
     211     12901034 :             Slice::into_inner(src_buf)
     212              :         };
     213     12909786 :         (src_buf, Ok(()))
     214     13989436 :     }
     215              : 
     216              :     /// Write a blob of data. Returns the offset that it was written to,
     217              :     /// which can be used to retrieve the data later.
     218      6994718 :     pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     219      6994718 :         &mut self,
     220      6994718 :         srcbuf: B,
     221      6994718 :         ctx: &RequestContext,
     222      6994718 :     ) -> (B::Buf, Result<u64, Error>) {
     223      6994718 :         let offset = self.offset;
     224      6994718 : 
     225      6994718 :         let len = srcbuf.bytes_init();
     226      6994718 : 
     227      6994718 :         let mut io_buf = self.io_buf.take().expect("we always put it back below");
     228      6994718 :         io_buf.clear();
     229      6994718 :         let (io_buf, hdr_res) = async {
     230      6994718 :             if len < 128 {
     231      6994718 :                 // Short blob. Write a 1-byte length header
     232      6994718 :                 io_buf.put_u8(len as u8);
     233      6982466 :                 self.write_all(io_buf, ctx).await
     234      6994718 :             } else {
     235      6994718 :                 // Write a 4-byte length header
     236      6994718 :                 if len > 0x7fff_ffff {
     237      6994718 :                     return (
     238            0 :                         io_buf,
     239            0 :                         Err(Error::new(
     240            0 :                             ErrorKind::Other,
     241            0 :                             format!("blob too large ({len} bytes)"),
     242            0 :                         )),
     243            0 :                     );
     244      6994718 :                 }
     245        12252 :                 if len > 0x0fff_ffff {
     246      6994718 :                     tracing::warn!("writing blob above future limit ({len} bytes)");
     247      6994718 :                 }
     248      6994718 :                 let mut len_buf = (len as u32).to_be_bytes();
     249        12252 :                 len_buf[0] |= 0x80;
     250        12252 :                 io_buf.extend_from_slice(&len_buf[..]);
     251        12252 :                 self.write_all(io_buf, ctx).await
     252      6994718 :             }
     253      6994718 :         }
     254       274979 :         .await;
     255      6994718 :         self.io_buf = Some(io_buf);
     256      6994718 :         match hdr_res {
     257      6994718 :             Ok(_) => (),
     258            0 :             Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
     259              :         }
     260      6994718 :         let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
     261      6994718 :         (srcbuf, res.map(|_| offset))
     262      6994718 :     }
     263              : }
     264              : 
     265              : impl BlobWriter<true> {
     266              :     /// Access the underlying `VirtualFile`.
     267              :     ///
     268              :     /// This function flushes the internal buffer before giving access
     269              :     /// to the underlying `VirtualFile`.
     270         1326 :     pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
     271         1326 :         self.flush_buffer(ctx).await?;
     272         1326 :         Ok(self.inner)
     273         1326 :     }
     274              : 
     275              :     /// Access the underlying `VirtualFile`.
     276              :     ///
     277              :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     278              :     /// the internal buffer before giving access.
     279            0 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     280            0 :         self.inner
     281            0 :     }
     282              : }
     283              : 
     284              : impl BlobWriter<false> {
     285              :     /// Access the underlying `VirtualFile`.
     286          238 :     pub fn into_inner(self) -> VirtualFile {
     287          238 :         self.inner
     288          238 :     }
     289              : }
     290              : 
     291              : #[cfg(test)]
     292              : mod tests {
     293              :     use super::*;
     294              :     use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
     295              :     use rand::{Rng, SeedableRng};
     296              : 
     297           24 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     298           24 :         let temp_dir = camino_tempfile::tempdir()?;
     299           24 :         let pathbuf = temp_dir.path().join("file");
     300           24 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     301           24 : 
     302           24 :         // Write part (in block to drop the file)
     303           24 :         let mut offsets = Vec::new();
     304              :         {
     305           24 :             let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
     306           24 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
     307         8236 :             for blob in blobs.iter() {
     308         8236 :                 let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
     309         8236 :                 let offs = res?;
     310         8236 :                 offsets.push(offs);
     311              :             }
     312              :             // Write out one page worth of zeros so that we can
     313              :             // read again with read_blk
     314           24 :             let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
     315           24 :             let offs = res?;
     316           24 :             println!("Writing final blob at offs={offs}");
     317           24 :             wtr.flush_buffer(&ctx).await?;
     318              :         }
     319              : 
     320           24 :         let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
     321           24 :         let rdr = BlockReaderRef::VirtualFile(&file);
     322           24 :         let rdr = BlockCursor::new(rdr);
     323         8236 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     324        10255 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     325         8236 :             assert_eq!(
     326         8236 :                 blob, &blob_read,
     327            0 :                 "mismatch for idx={idx} at offset={offset}"
     328              :             );
     329              :         }
     330           24 :         Ok(())
     331           24 :     }
     332              : 
     333         4104 :     fn random_array(len: usize) -> Vec<u8> {
     334         4104 :         let mut rng = rand::thread_rng();
     335     49064544 :         (0..len).map(|_| rng.gen()).collect::<_>()
     336         4104 :     }
     337              : 
     338              :     #[tokio::test]
     339            2 :     async fn test_one() -> Result<(), Error> {
     340            2 :         let blobs = &[vec![12, 21, 22]];
     341            8 :         round_trip_test::<false>(blobs).await?;
     342            4 :         round_trip_test::<true>(blobs).await?;
     343            2 :         Ok(())
     344            2 :     }
     345              : 
     346              :     #[tokio::test]
     347            2 :     async fn test_hello_simple() -> Result<(), Error> {
     348            2 :         let blobs = &[
     349            2 :             vec![0, 1, 2, 3],
     350            2 :             b"Hello, World!".to_vec(),
     351            2 :             Vec::new(),
     352            2 :             b"foobar".to_vec(),
     353            2 :         ];
     354           16 :         round_trip_test::<false>(blobs).await?;
     355            7 :         round_trip_test::<true>(blobs).await?;
     356            2 :         Ok(())
     357            2 :     }
     358              : 
     359              :     #[tokio::test]
     360            2 :     async fn test_really_big_array() -> Result<(), Error> {
     361            2 :         let blobs = &[
     362            2 :             b"test".to_vec(),
     363            2 :             random_array(10 * PAGE_SZ),
     364            2 :             b"foobar".to_vec(),
     365            2 :         ];
     366           24 :         round_trip_test::<false>(blobs).await?;
     367           17 :         round_trip_test::<true>(blobs).await?;
     368            2 :         Ok(())
     369            2 :     }
     370              : 
     371              :     #[tokio::test]
     372            2 :     async fn test_arrays_inc() -> Result<(), Error> {
     373            2 :         let blobs = (0..PAGE_SZ / 8)
     374         2048 :             .map(|v| random_array(v * 16))
     375            2 :             .collect::<Vec<_>>();
     376         4162 :         round_trip_test::<false>(&blobs).await?;
     377         2212 :         round_trip_test::<true>(&blobs).await?;
     378            2 :         Ok(())
     379            2 :     }
     380              : 
     381              :     #[tokio::test]
     382            2 :     async fn test_arrays_random_size() -> Result<(), Error> {
     383            2 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     384            2 :         let blobs = (0..1024)
     385         2048 :             .map(|_| {
     386         2048 :                 let mut sz: u16 = rng.gen();
     387         2048 :                 // Make 50% of the arrays small
     388         2048 :                 if rng.gen() {
     389         1032 :                     sz &= 63;
     390         1032 :                 }
     391         2048 :                 random_array(sz.into())
     392         2048 :             })
     393            2 :             .collect::<Vec<_>>();
     394         5106 :         round_trip_test::<false>(&blobs).await?;
     395         3279 :         round_trip_test::<true>(&blobs).await?;
     396            2 :         Ok(())
     397            2 :     }
     398              : 
     399              :     #[tokio::test]
     400            2 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     401            2 :         let blobs = &[
     402            2 :             random_array(PAGE_SZ - 4),
     403            2 :             random_array(PAGE_SZ - 4),
     404            2 :             random_array(PAGE_SZ - 4),
     405            2 :         ];
     406           14 :         round_trip_test::<false>(blobs).await?;
     407            6 :         round_trip_test::<true>(blobs).await?;
     408            2 :         Ok(())
     409            2 :     }
     410              : }
        

Generated by: LCOV version 2.1-beta