LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 96.4 % 225 217 8 217
Current Date: 2023-10-19 02:04:12 Functions: 96.0 % 50 48 2 5 43 5
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Functions for reading and writing variable-sized "blobs".
       3                 : //!
       4                 : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5                 : //! actual data. If the length is smaller than 128 bytes, the length
       6                 : //! is written as a one byte. If it's larger than that, the length
       7                 : //! is written as a four-byte integer, in big-endian, with the high
       8                 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9                 : //! by peeking at the first byte.
      10                 : //!
      11                 : //! len <  128: 0XXXXXXX
      12                 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      13                 : //!
      14                 : use crate::context::RequestContext;
      15                 : use crate::page_cache::PAGE_SZ;
      16                 : use crate::tenant::block_io::BlockCursor;
      17                 : use crate::virtual_file::VirtualFile;
      18                 : use std::cmp::min;
      19                 : use std::io::{Error, ErrorKind};
      20                 : 
      21                 : impl<'a> BlockCursor<'a> {
      22                 :     /// Read a blob into a new buffer.
      23 CBC    61295721 :     pub async fn read_blob(
      24        61295721 :         &self,
      25        61295721 :         offset: u64,
      26        61295721 :         ctx: &RequestContext,
      27        61295721 :     ) -> Result<Vec<u8>, std::io::Error> {
      28        61295719 :         let mut buf = Vec::new();
      29        61295719 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
      30        61295716 :         Ok(buf)
      31        61295716 :     }
      32                 :     /// Read blob into the given buffer. Any previous contents in the buffer
      33                 :     /// are overwritten.
      34       245880840 :     pub async fn read_blob_into_buf(
      35       245880840 :         &self,
      36       245880840 :         offset: u64,
      37       245880840 :         dstbuf: &mut Vec<u8>,
      38       245880840 :         ctx: &RequestContext,
      39       245880840 :     ) -> Result<(), std::io::Error> {
      40       245880828 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      41       245880828 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      42                 : 
      43       245880828 :         let mut buf = self.read_blk(blknum, ctx).await?;
      44                 : 
      45                 :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      46       245880822 :         let first_len_byte = buf[off];
      47       245880822 :         let len: usize = if first_len_byte < 0x80 {
      48                 :             // 1-byte length header
      49       155608083 :             off += 1;
      50       155608083 :             first_len_byte as usize
      51                 :         } else {
      52                 :             // 4-byte length header
      53        90272739 :             let mut len_buf = [0u8; 4];
      54        90272739 :             let thislen = PAGE_SZ - off;
      55        90272739 :             if thislen < 4 {
      56                 :                 // it is split across two pages
      57           29488 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      58           29488 :                 blknum += 1;
      59           29488 :                 buf = self.read_blk(blknum, ctx).await?;
      60           29488 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      61           29488 :                 off = 4 - thislen;
      62        90243251 :             } else {
      63        90243251 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      64        90243251 :                 off += 4;
      65        90243251 :             }
      66        90272739 :             len_buf[0] &= 0x7f;
      67        90272739 :             u32::from_be_bytes(len_buf) as usize
      68                 :         };
      69                 : 
      70       245880822 :         dstbuf.clear();
      71       245880822 :         dstbuf.reserve(len);
      72       245880822 : 
      73       245880822 :         // Read the payload
      74       245880822 :         let mut remain = len;
      75       500376991 :         while remain > 0 {
      76       254496171 :             let mut page_remain = PAGE_SZ - off;
      77       254496171 :             if page_remain == 0 {
      78                 :                 // continue on next page
      79         8645372 :                 blknum += 1;
      80         8645372 :                 buf = self.read_blk(blknum, ctx).await?;
      81         8645370 :                 off = 0;
      82         8645370 :                 page_remain = PAGE_SZ;
      83       245850799 :             }
      84       254496169 :             let this_blk_len = min(remain, page_remain);
      85       254496169 :             dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
      86       254496169 :             remain -= this_blk_len;
      87       254496169 :             off += this_blk_len;
      88                 :         }
      89       245880820 :         Ok(())
      90       245880820 :     }
      91                 : }
      92                 : 
      93                 : /// A wrapper of `VirtualFile` that allows users to write blobs.
      94                 : ///
      95                 : /// If a `BlobWriter` is dropped, the internal buffer will be
      96                 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
      97                 : /// manually before dropping.
      98                 : pub struct BlobWriter<const BUFFERED: bool> {
      99                 :     inner: VirtualFile,
     100                 :     offset: u64,
     101                 :     /// A buffer to save on write calls, only used if BUFFERED=true
     102                 :     buf: Vec<u8>,
     103                 : }
     104                 : 
     105                 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     106           19146 :     pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
     107           19146 :         Self {
     108           19146 :             inner,
     109           19146 :             offset: start_offset,
     110           19146 :             buf: Vec::with_capacity(Self::CAPACITY),
     111           19146 :         }
     112           19146 :     }
     113                 : 
     114         1889824 :     pub fn size(&self) -> u64 {
     115         1889824 :         self.offset
     116         1889824 :     }
     117                 : 
     118                 :     const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
     119                 : 
     120                 :     #[inline(always)]
     121                 :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     122                 :     /// You need to make sure that the internal buffer is empty, otherwise
     123                 :     /// data will be written in wrong order.
     124          457790 :     async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> {
     125          457790 :         self.inner.write_all(src_buf).await?;
     126          457790 :         self.offset += src_buf.len() as u64;
     127          457790 :         Ok(())
     128          457790 :     }
     129                 : 
     130                 :     #[inline(always)]
     131                 :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     132         4792104 :     pub async fn flush_buffer(&mut self) -> Result<(), Error> {
     133         4792104 :         self.inner.write_all(&self.buf).await?;
     134         4792103 :         self.buf.clear();
     135         4792103 :         Ok(())
     136         4792103 :     }
     137                 : 
     138                 :     #[inline(always)]
     139                 :     /// Writes as much of `src_buf` into the internal buffer as it fits
     140       170217609 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     141       170217609 :         let remaining = Self::CAPACITY - self.buf.len();
     142       170217609 :         let to_copy = src_buf.len().min(remaining);
     143       170217609 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     144       170217609 :         self.offset += to_copy as u64;
     145       170217609 :         to_copy
     146       170217609 :     }
     147                 : 
     148                 :     /// Internal, possibly buffered, write function
     149       165916608 :     async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> {
     150       165916608 :         if !BUFFERED {
     151          452374 :             assert!(self.buf.is_empty());
     152          452374 :             self.write_all_unbuffered(src_buf).await?;
     153          452374 :             return Ok(());
     154       165464234 :         }
     155       165464234 :         let remaining = Self::CAPACITY - self.buf.len();
     156       165464234 :         // First try to copy as much as we can into the buffer
     157       165464234 :         if remaining > 0 {
     158       165464234 :             let copied = self.write_into_buffer(src_buf);
     159       165464234 :             src_buf = &src_buf[copied..];
     160       165464234 :         }
     161                 :         // Then, if the buffer is full, flush it out
     162       165464234 :         if self.buf.len() == Self::CAPACITY {
     163         4776371 :             self.flush_buffer().await?;
     164       160687863 :         }
     165                 :         // Finally, write the tail of src_buf:
     166                 :         // If it wholly fits into the buffer without
     167                 :         // completely filling it, then put it there.
     168                 :         // If not, write it out directly.
     169       165464233 :         if !src_buf.is_empty() {
     170         4758791 :             assert_eq!(self.buf.len(), 0);
     171         4758791 :             if src_buf.len() < Self::CAPACITY {
     172         4753375 :                 let copied = self.write_into_buffer(src_buf);
     173         4753375 :                 // We just verified above that src_buf fits into our internal buffer.
     174         4753375 :                 assert_eq!(copied, src_buf.len());
     175                 :             } else {
     176            5416 :                 self.write_all_unbuffered(src_buf).await?;
     177                 :             }
     178       160705442 :         }
     179       165464233 :         Ok(())
     180       165916607 :     }
     181                 : 
     182                 :     /// Write a blob of data. Returns the offset that it was written to,
     183                 :     /// which can be used to retrieve the data later.
     184        82958304 :     pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
     185        82958304 :         let offset = self.offset;
     186        82958304 : 
     187        82958304 :         if srcbuf.len() < 128 {
     188                 :             // Short blob. Write a 1-byte length header
     189        51721507 :             let len_buf = srcbuf.len() as u8;
     190        51721507 :             self.write_all(&[len_buf]).await?;
     191                 :         } else {
     192                 :             // Write a 4-byte length header
     193        31236797 :             if srcbuf.len() > 0x7fff_ffff {
     194 UBC           0 :                 return Err(Error::new(
     195               0 :                     ErrorKind::Other,
     196               0 :                     format!("blob too large ({} bytes)", srcbuf.len()),
     197               0 :                 ));
     198 CBC    31236797 :             }
     199        31236797 :             let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
     200        31236797 :             len_buf[0] |= 0x80;
     201        31236797 :             self.write_all(&len_buf).await?;
     202                 :         }
     203        82958304 :         self.write_all(srcbuf).await?;
     204        82958303 :         Ok(offset)
     205        82958303 :     }
     206                 : }
     207                 : 
     208                 : impl BlobWriter<true> {
     209                 :     /// Access the underlying `VirtualFile`.
     210                 :     ///
     211                 :     /// This function flushes the internal buffer before giving access
     212                 :     /// to the underlying `VirtualFile`.
     213           15721 :     pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
     214           15721 :         self.flush_buffer().await?;
     215           15721 :         Ok(self.inner)
     216           15721 :     }
     217                 : 
     218                 :     /// Access the underlying `VirtualFile`.
     219                 :     ///
     220                 :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     221                 :     /// the internal buffer before giving access.
     222 UBC           0 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     223               0 :         self.inner
     224               0 :     }
     225                 : }
     226                 : 
     227                 : impl BlobWriter<false> {
     228                 :     /// Access the underlying `VirtualFile`.
     229 CBC        3402 :     pub fn into_inner(self) -> VirtualFile {
     230            3402 :         self.inner
     231            3402 :     }
     232                 : }
     233                 : 
     234                 : #[cfg(test)]
     235                 : mod tests {
     236                 :     use super::*;
     237                 :     use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
     238                 :     use rand::{Rng, SeedableRng};
     239                 : 
     240              12 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     241              12 :         let temp_dir = camino_tempfile::tempdir()?;
     242              12 :         let pathbuf = temp_dir.path().join("file");
     243              12 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     244              12 : 
     245              12 :         // Write part (in block to drop the file)
     246              12 :         let mut offsets = Vec::new();
     247                 :         {
     248              12 :             let file = VirtualFile::create(pathbuf.as_path()).await?;
     249              12 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
     250            4118 :             for blob in blobs.iter() {
     251            4118 :                 let offs = wtr.write_blob(blob).await?;
     252            4118 :                 offsets.push(offs);
     253                 :             }
     254                 :             // Write out one page worth of zeros so that we can
     255                 :             // read again with read_blk
     256              12 :             let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?;
     257              12 :             println!("Writing final blob at offs={offs}");
     258              12 :             wtr.flush_buffer().await?;
     259                 :         }
     260                 : 
     261              12 :         let file = VirtualFile::open(pathbuf.as_path()).await?;
     262              12 :         let rdr = BlockReaderRef::VirtualFile(&file);
     263              12 :         let rdr = BlockCursor::new(rdr);
     264            4118 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     265            4118 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     266            4118 :             assert_eq!(
     267            4118 :                 blob, &blob_read,
     268 UBC           0 :                 "mismatch for idx={idx} at offset={offset}"
     269                 :             );
     270                 :         }
     271 CBC          12 :         Ok(())
     272              12 :     }
     273                 : 
     274            2052 :     fn random_array(len: usize) -> Vec<u8> {
     275            2052 :         let mut rng = rand::thread_rng();
     276        40965728 :         (0..len).map(|_| rng.gen()).collect::<_>()
     277            2052 :     }
     278                 : 
     279               1 :     #[tokio::test]
     280               1 :     async fn test_one() -> Result<(), Error> {
     281               1 :         let blobs = &[vec![12, 21, 22]];
     282               1 :         round_trip_test::<false>(blobs).await?;
     283               1 :         round_trip_test::<true>(blobs).await?;
     284               1 :         Ok(())
     285                 :     }
     286                 : 
     287               1 :     #[tokio::test]
     288               1 :     async fn test_hello_simple() -> Result<(), Error> {
     289               1 :         let blobs = &[
     290               1 :             vec![0, 1, 2, 3],
     291               1 :             b"Hello, World!".to_vec(),
     292               1 :             Vec::new(),
     293               1 :             b"foobar".to_vec(),
     294               1 :         ];
     295               1 :         round_trip_test::<false>(blobs).await?;
     296               1 :         round_trip_test::<true>(blobs).await?;
     297               1 :         Ok(())
     298                 :     }
     299                 : 
     300               1 :     #[tokio::test]
     301               1 :     async fn test_really_big_array() -> Result<(), Error> {
     302               1 :         let blobs = &[
     303               1 :             b"test".to_vec(),
     304               1 :             random_array(10 * PAGE_SZ),
     305               1 :             b"foobar".to_vec(),
     306               1 :         ];
     307               1 :         round_trip_test::<false>(blobs).await?;
     308               1 :         round_trip_test::<true>(blobs).await?;
     309               1 :         Ok(())
     310                 :     }
     311                 : 
     312               1 :     #[tokio::test]
     313               1 :     async fn test_arrays_inc() -> Result<(), Error> {
     314               1 :         let blobs = (0..PAGE_SZ / 8)
     315            1024 :             .map(|v| random_array(v * 16))
     316               1 :             .collect::<Vec<_>>();
     317               1 :         round_trip_test::<false>(&blobs).await?;
     318               1 :         round_trip_test::<true>(&blobs).await?;
     319               1 :         Ok(())
     320                 :     }
     321                 : 
     322               1 :     #[tokio::test]
     323               1 :     async fn test_arrays_random_size() -> Result<(), Error> {
     324               1 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     325               1 :         let blobs = (0..1024)
     326            1024 :             .map(|_| {
     327            1024 :                 let mut sz: u16 = rng.gen();
     328            1024 :                 // Make 50% of the arrays small
     329            1024 :                 if rng.gen() {
     330             516 :                     sz |= 63;
     331             516 :                 }
     332            1024 :                 random_array(sz.into())
     333            1024 :             })
     334               1 :             .collect::<Vec<_>>();
     335               1 :         round_trip_test::<false>(&blobs).await?;
     336               1 :         round_trip_test::<true>(&blobs).await?;
     337               1 :         Ok(())
     338                 :     }
     339                 : 
     340               1 :     #[tokio::test]
     341               1 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     342               1 :         let blobs = &[
     343               1 :             random_array(PAGE_SZ - 4),
     344               1 :             random_array(PAGE_SZ - 4),
     345               1 :             random_array(PAGE_SZ - 4),
     346               1 :         ];
     347               1 :         round_trip_test::<false>(blobs).await?;
     348               1 :         round_trip_test::<true>(blobs).await?;
     349               1 :         Ok(())
     350                 :     }
     351                 : }
        

Generated by: LCOV version 2.1-beta