LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 96.4 % 225 217
Test Date: 2024-02-07 07:37:29 Functions: 96.0 % 50 48

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte.
      10              : //!
      11              : //! len <  128: 0XXXXXXX
      12              : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      13              : //!
      14              : use crate::context::RequestContext;
      15              : use crate::page_cache::PAGE_SZ;
      16              : use crate::tenant::block_io::BlockCursor;
      17              : use crate::virtual_file::VirtualFile;
      18              : use std::cmp::min;
      19              : use std::io::{Error, ErrorKind};
      20              : 
      21              : impl<'a> BlockCursor<'a> {
      22              :     /// Read a blob into a new buffer.
      23     53535319 :     pub async fn read_blob(
      24     53535319 :         &self,
      25     53535319 :         offset: u64,
      26     53535319 :         ctx: &RequestContext,
      27     53535319 :     ) -> Result<Vec<u8>, std::io::Error> {
      28     53535319 :         let mut buf = Vec::new();
      29     53535319 :         self.read_blob_into_buf(offset, &mut buf, ctx).await?;
      30     53535319 :         Ok(buf)
      31     53535319 :     }
      32              :     /// Read blob into the given buffer. Any previous contents in the buffer
      33              :     /// are overwritten.
      34    139016425 :     pub async fn read_blob_into_buf(
      35    139016425 :         &self,
      36    139016425 :         offset: u64,
      37    139016425 :         dstbuf: &mut Vec<u8>,
      38    139016425 :         ctx: &RequestContext,
      39    139016520 :     ) -> Result<(), std::io::Error> {
      40    139016520 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      41    139016520 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      42              : 
      43    139016520 :         let mut buf = self.read_blk(blknum, ctx).await?;
      44              : 
      45              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      46    139016519 :         let first_len_byte = buf[off];
      47    139016519 :         let len: usize = if first_len_byte < 0x80 {
      48              :             // 1-byte length header
      49    102715024 :             off += 1;
      50    102715024 :             first_len_byte as usize
      51              :         } else {
      52              :             // 4-byte length header
      53     36301495 :             let mut len_buf = [0u8; 4];
      54     36301495 :             let thislen = PAGE_SZ - off;
      55     36301495 :             if thislen < 4 {
      56              :                 // it is split across two pages
      57        11768 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      58        11768 :                 blknum += 1;
      59        11768 :                 buf = self.read_blk(blknum, ctx).await?;
      60        11768 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      61        11768 :                 off = 4 - thislen;
      62     36289727 :             } else {
      63     36289727 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      64     36289727 :                 off += 4;
      65     36289727 :             }
      66     36301495 :             len_buf[0] &= 0x7f;
      67     36301495 :             u32::from_be_bytes(len_buf) as usize
      68              :         };
      69              : 
      70    139016519 :         dstbuf.clear();
      71    139016519 :         dstbuf.reserve(len);
      72    139016519 : 
      73    139016519 :         // Read the payload
      74    139016519 :         let mut remain = len;
      75    286731462 :         while remain > 0 {
      76    147714943 :             let mut page_remain = PAGE_SZ - off;
      77    147714943 :             if page_remain == 0 {
      78              :                 // continue on next page
      79      8713660 :                 blknum += 1;
      80      8713660 :                 buf = self.read_blk(blknum, ctx).await?;
      81      8713660 :                 off = 0;
      82      8713660 :                 page_remain = PAGE_SZ;
      83    139001283 :             }
      84    147714943 :             let this_blk_len = min(remain, page_remain);
      85    147714943 :             dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
      86    147714943 :             remain -= this_blk_len;
      87    147714943 :             off += this_blk_len;
      88              :         }
      89    139016519 :         Ok(())
      90    139016519 :     }
      91              : }
      92              : 
      93              : /// A wrapper of `VirtualFile` that allows users to write blobs.
      94              : ///
      95              : /// If a `BlobWriter` is dropped, the internal buffer will be
      96              : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
      97              : /// manually before dropping.
      98              : pub struct BlobWriter<const BUFFERED: bool> {
      99              :     inner: VirtualFile,
     100              :     offset: u64,
     101              :     /// A buffer to save on write calls, only used if BUFFERED=true
     102              :     buf: Vec<u8>,
     103              : }
     104              : 
     105              : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
     106        21971 :     pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
     107        21971 :         Self {
     108        21971 :             inner,
     109        21971 :             offset: start_offset,
     110        21971 :             buf: Vec::with_capacity(Self::CAPACITY),
     111        21971 :         }
     112        21971 :     }
     113              : 
     114      2395574 :     pub fn size(&self) -> u64 {
     115      2395574 :         self.offset
     116      2395574 :     }
     117              : 
     118              :     const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
     119              : 
     120              :     #[inline(always)]
     121              :     /// Writes the given buffer directly to the underlying `VirtualFile`.
     122              :     /// You need to make sure that the internal buffer is empty, otherwise
     123              :     /// data will be written in wrong order.
     124       470923 :     async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> {
     125       470923 :         self.inner.write_all(src_buf).await?;
     126       470923 :         self.offset += src_buf.len() as u64;
     127       470923 :         Ok(())
     128       470923 :     }
     129              : 
     130              :     #[inline(always)]
     131              :     /// Flushes the internal buffer to the underlying `VirtualFile`.
     132      4585098 :     pub async fn flush_buffer(&mut self) -> Result<(), Error> {
     133      4585098 :         self.inner.write_all(&self.buf).await?;
     134      4585097 :         self.buf.clear();
     135      4585097 :         Ok(())
     136      4585097 :     }
     137              : 
     138              :     #[inline(always)]
     139              :     /// Writes as much of `src_buf` into the internal buffer as it fits
     140    109640927 :     fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
     141    109640927 :         let remaining = Self::CAPACITY - self.buf.len();
     142    109640927 :         let to_copy = src_buf.len().min(remaining);
     143    109640927 :         self.buf.extend_from_slice(&src_buf[..to_copy]);
     144    109640927 :         self.offset += to_copy as u64;
     145    109640927 :         to_copy
     146    109640927 :     }
     147              : 
     148              :     /// Internal, possibly buffered, write function
     149    105553324 :     async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> {
     150    105553324 :         if !BUFFERED {
     151       466972 :             assert!(self.buf.is_empty());
     152       466972 :             self.write_all_unbuffered(src_buf).await?;
     153       466972 :             return Ok(());
     154    105086352 :         }
     155    105086352 :         let remaining = Self::CAPACITY - self.buf.len();
     156    105086352 :         // First try to copy as much as we can into the buffer
     157    105086352 :         if remaining > 0 {
     158    105086352 :             let copied = self.write_into_buffer(src_buf);
     159    105086352 :             src_buf = &src_buf[copied..];
     160    105086352 :         }
     161              :         // Then, if the buffer is full, flush it out
     162    105086352 :         if self.buf.len() == Self::CAPACITY {
     163      4569159 :             self.flush_buffer().await?;
     164    100517193 :         }
     165              :         // Finally, write the tail of src_buf:
     166              :         // If it wholly fits into the buffer without
     167              :         // completely filling it, then put it there.
     168              :         // If not, write it out directly.
     169    105086351 :         if !src_buf.is_empty() {
     170      4558526 :             assert_eq!(self.buf.len(), 0);
     171      4558526 :             if src_buf.len() < Self::CAPACITY {
     172      4554575 :                 let copied = self.write_into_buffer(src_buf);
     173      4554575 :                 // We just verified above that src_buf fits into our internal buffer.
     174      4554575 :                 assert_eq!(copied, src_buf.len());
     175              :             } else {
     176         3951 :                 self.write_all_unbuffered(src_buf).await?;
     177              :             }
     178    100527825 :         }
     179    105086351 :         Ok(())
     180    105553323 :     }
     181              : 
     182              :     /// Write a blob of data. Returns the offset that it was written to,
     183              :     /// which can be used to retrieve the data later.
     184     52776662 :     pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
     185     52776662 :         let offset = self.offset;
     186     52776662 : 
     187     52776662 :         if srcbuf.len() < 128 {
     188              :             // Short blob. Write a 1-byte length header
     189     32095843 :             let len_buf = srcbuf.len() as u8;
     190     32095843 :             self.write_all(&[len_buf]).await?;
     191              :         } else {
     192              :             // Write a 4-byte length header
     193     20680819 :             if srcbuf.len() > 0x7fff_ffff {
     194            0 :                 return Err(Error::new(
     195            0 :                     ErrorKind::Other,
     196            0 :                     format!("blob too large ({} bytes)", srcbuf.len()),
     197            0 :                 ));
     198     20680819 :             }
     199     20680819 :             let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
     200     20680819 :             len_buf[0] |= 0x80;
     201     20680819 :             self.write_all(&len_buf).await?;
     202              :         }
     203     52776662 :         self.write_all(srcbuf).await?;
     204     52776661 :         Ok(offset)
     205     52776661 :     }
     206              : }
     207              : 
     208              : impl BlobWriter<true> {
     209              :     /// Access the underlying `VirtualFile`.
     210              :     ///
     211              :     /// This function flushes the internal buffer before giving access
     212              :     /// to the underlying `VirtualFile`.
     213        15915 :     pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
     214        15915 :         self.flush_buffer().await?;
     215        15915 :         Ok(self.inner)
     216        15915 :     }
     217              : 
     218              :     /// Access the underlying `VirtualFile`.
     219              :     ///
     220              :     /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
     221              :     /// the internal buffer before giving access.
     222            0 :     pub fn into_inner_no_flush(self) -> VirtualFile {
     223            0 :         self.inner
     224            0 :     }
     225              : }
     226              : 
     227              : impl BlobWriter<false> {
     228              :     /// Access the underlying `VirtualFile`.
     229         6029 :     pub fn into_inner(self) -> VirtualFile {
     230         6029 :         self.inner
     231         6029 :     }
     232              : }
     233              : 
     234              : #[cfg(test)]
     235              : mod tests {
     236              :     use super::*;
     237              :     use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
     238              :     use rand::{Rng, SeedableRng};
     239              : 
     240           24 :     async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
     241           24 :         let temp_dir = camino_tempfile::tempdir()?;
     242           24 :         let pathbuf = temp_dir.path().join("file");
     243           24 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     244           24 : 
     245           24 :         // Write part (in block to drop the file)
     246           24 :         let mut offsets = Vec::new();
     247              :         {
     248           24 :             let file = VirtualFile::create(pathbuf.as_path()).await?;
     249           24 :             let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
     250         8236 :             for blob in blobs.iter() {
     251         8236 :                 let offs = wtr.write_blob(blob).await?;
     252         8236 :                 offsets.push(offs);
     253              :             }
     254              :             // Write out one page worth of zeros so that we can
     255              :             // read again with read_blk
     256           24 :             let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?;
     257           24 :             println!("Writing final blob at offs={offs}");
     258           24 :             wtr.flush_buffer().await?;
     259              :         }
     260              : 
     261           24 :         let file = VirtualFile::open(pathbuf.as_path()).await?;
     262           24 :         let rdr = BlockReaderRef::VirtualFile(&file);
     263           24 :         let rdr = BlockCursor::new(rdr);
     264         8236 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     265        10256 :             let blob_read = rdr.read_blob(*offset, &ctx).await?;
     266         8236 :             assert_eq!(
     267         8236 :                 blob, &blob_read,
     268            0 :                 "mismatch for idx={idx} at offset={offset}"
     269              :             );
     270              :         }
     271           24 :         Ok(())
     272           24 :     }
     273              : 
     274         4104 :     fn random_array(len: usize) -> Vec<u8> {
     275         4104 :         let mut rng = rand::thread_rng();
     276     49064544 :         (0..len).map(|_| rng.gen()).collect::<_>()
     277         4104 :     }
     278              : 
     279            2 :     #[tokio::test]
     280            2 :     async fn test_one() -> Result<(), Error> {
     281            2 :         let blobs = &[vec![12, 21, 22]];
     282            4 :         round_trip_test::<false>(blobs).await?;
     283            3 :         round_trip_test::<true>(blobs).await?;
     284            2 :         Ok(())
     285              :     }
     286              : 
     287            2 :     #[tokio::test]
     288            2 :     async fn test_hello_simple() -> Result<(), Error> {
     289            2 :         let blobs = &[
     290            2 :             vec![0, 1, 2, 3],
     291            2 :             b"Hello, World!".to_vec(),
     292            2 :             Vec::new(),
     293            2 :             b"foobar".to_vec(),
     294            2 :         ];
     295            7 :         round_trip_test::<false>(blobs).await?;
     296            6 :         round_trip_test::<true>(blobs).await?;
     297            2 :         Ok(())
     298              :     }
     299              : 
     300            2 :     #[tokio::test]
     301            2 :     async fn test_really_big_array() -> Result<(), Error> {
     302            2 :         let blobs = &[
     303            2 :             b"test".to_vec(),
     304            2 :             random_array(10 * PAGE_SZ),
     305            2 :             b"foobar".to_vec(),
     306            2 :         ];
     307           16 :         round_trip_test::<false>(blobs).await?;
     308           15 :         round_trip_test::<true>(blobs).await?;
     309            2 :         Ok(())
     310              :     }
     311              : 
     312            2 :     #[tokio::test]
     313            2 :     async fn test_arrays_inc() -> Result<(), Error> {
     314            2 :         let blobs = (0..PAGE_SZ / 8)
     315         2048 :             .map(|v| random_array(v * 16))
     316            2 :             .collect::<Vec<_>>();
     317         2145 :         round_trip_test::<false>(&blobs).await?;
     318         2110 :         round_trip_test::<true>(&blobs).await?;
     319            2 :         Ok(())
     320              :     }
     321              : 
     322            2 :     #[tokio::test]
     323            2 :     async fn test_arrays_random_size() -> Result<(), Error> {
     324            2 :         let mut rng = rand::rngs::StdRng::seed_from_u64(42);
     325            2 :         let blobs = (0..1024)
     326         2048 :             .map(|_| {
     327         2048 :                 let mut sz: u16 = rng.gen();
     328         2048 :                 // Make 50% of the arrays small
     329         2048 :                 if rng.gen() {
     330         1032 :                     sz &= 63;
     331         1032 :                 }
     332         2048 :                 random_array(sz.into())
     333         2048 :             })
     334            2 :             .collect::<Vec<_>>();
     335         3094 :         round_trip_test::<false>(&blobs).await?;
     336         3055 :         round_trip_test::<true>(&blobs).await?;
     337            2 :         Ok(())
     338              :     }
     339              : 
     340            2 :     #[tokio::test]
     341            2 :     async fn test_arrays_page_boundary() -> Result<(), Error> {
     342            2 :         let blobs = &[
     343            2 :             random_array(PAGE_SZ - 4),
     344            2 :             random_array(PAGE_SZ - 4),
     345            2 :             random_array(PAGE_SZ - 4),
     346            2 :         ];
     347            6 :         round_trip_test::<false>(blobs).await?;
     348            5 :         round_trip_test::<true>(blobs).await?;
     349            2 :         Ok(())
     350              :     }
     351              : }
        

Generated by: LCOV version 2.1-beta