LCOV - code coverage report
Current view: top level - pageserver/src/tenant - blob_io.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 95.2 % 83 79
Test Date: 2023-09-06 10:18:01 Functions: 100.0 % 14 14

            Line data    Source code
       1              : //!
       2              : //! Functions for reading and writing variable-sized "blobs".
       3              : //!
       4              : //! Each blob begins with a 1- or 4-byte length field, followed by the
       5              : //! actual data. If the length is smaller than 128 bytes, the length
       6              : //! is written as a one byte. If it's larger than that, the length
       7              : //! is written as a four-byte integer, in big-endian, with the high
       8              : //! bit set. This way, we can detect whether it's 1- or 4-byte header
       9              : //! by peeking at the first byte.
      10              : //!
      11              : //! len <  128: 0XXXXXXX
      12              : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      13              : //!
      14              : use crate::page_cache::PAGE_SZ;
      15              : use crate::tenant::block_io::BlockCursor;
      16              : use std::cmp::min;
      17              : use std::io::{Error, ErrorKind};
      18              : 
      19              : impl<'a> BlockCursor<'a> {
      20              :     /// Read a blob into a new buffer.
      21     66457620 :     pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
      22     66457384 :         let mut buf = Vec::new();
      23     66457384 :         self.read_blob_into_buf(offset, &mut buf).await?;
      24     66457382 :         Ok(buf)
      25     66457382 :     }
      26              :     /// Read blob into the given buffer. Any previous contents in the buffer
      27              :     /// are overwritten.
      28    302763375 :     pub async fn read_blob_into_buf(
      29    302763375 :         &self,
      30    302763375 :         offset: u64,
      31    302763375 :         dstbuf: &mut Vec<u8>,
      32    302763375 :     ) -> Result<(), std::io::Error> {
      33    302763071 :         let mut blknum = (offset / PAGE_SZ as u64) as u32;
      34    302763071 :         let mut off = (offset % PAGE_SZ as u64) as usize;
      35              : 
      36    302763071 :         let mut buf = self.read_blk(blknum).await?;
      37              : 
      38              :         // peek at the first byte, to determine if it's a 1- or 4-byte length
      39    302763071 :         let first_len_byte = buf[off];
      40    302763071 :         let len: usize = if first_len_byte < 0x80 {
      41              :             // 1-byte length header
      42    202114663 :             off += 1;
      43    202114663 :             first_len_byte as usize
      44              :         } else {
      45              :             // 4-byte length header
      46    100648408 :             let mut len_buf = [0u8; 4];
      47    100648408 :             let thislen = PAGE_SZ - off;
      48    100648408 :             if thislen < 4 {
      49              :                 // it is split across two pages
      50        34245 :                 len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
      51        34245 :                 blknum += 1;
      52        34245 :                 buf = self.read_blk(blknum).await?;
      53        34244 :                 len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
      54        34244 :                 off = 4 - thislen;
      55    100614163 :             } else {
      56    100614163 :                 len_buf.copy_from_slice(&buf[off..off + 4]);
      57    100614163 :                 off += 4;
      58    100614163 :             }
      59    100648407 :             len_buf[0] &= 0x7f;
      60    100648407 :             u32::from_be_bytes(len_buf) as usize
      61              :         };
      62              : 
      63    302763070 :         dstbuf.clear();
      64    302763070 :         dstbuf.reserve(len);
      65    302763070 : 
      66    302763070 :         // Read the payload
      67    302763070 :         let mut remain = len;
      68    615065283 :         while remain > 0 {
      69    312302214 :             let mut page_remain = PAGE_SZ - off;
      70    312302214 :             if page_remain == 0 {
      71              :                 // continue on next page
      72      9577769 :                 blknum += 1;
      73      9577769 :                 buf = self.read_blk(blknum).await?;
      74      9577768 :                 off = 0;
      75      9577768 :                 page_remain = PAGE_SZ;
      76    302724445 :             }
      77    312302213 :             let this_blk_len = min(remain, page_remain);
      78    312302213 :             dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
      79    312302213 :             remain -= this_blk_len;
      80    312302213 :             off += this_blk_len;
      81              :         }
      82    302763069 :         Ok(())
      83    302763069 :     }
      84              : }
      85              : 
      86              : ///
      87              : /// An implementation of BlobWriter to write blobs to anything that
      88              : /// implements std::io::Write.
      89              : ///
      90              : pub struct WriteBlobWriter<W> {
      91              :     inner: W,
      92              :     offset: u64,
      93              : }
      94              : 
      95              : impl<W> WriteBlobWriter<W> {
      96        16627 :     pub fn new(inner: W, start_offset: u64) -> Self {
      97        16627 :         WriteBlobWriter {
      98        16627 :             inner,
      99        16627 :             offset: start_offset,
     100        16627 :         }
     101        16627 :     }
     102              : 
     103      1768535 :     pub fn size(&self) -> u64 {
     104      1768535 :         self.offset
     105      1768535 :     }
     106              : 
     107              :     /// Access the underlying Write object.
     108              :     ///
     109              :     /// NOTE: WriteBlobWriter keeps track of the current write offset. If
     110              :     /// you write something directly to the inner Write object, it makes the
     111              :     /// internally tracked 'offset' to go out of sync. So don't do that.
     112        16620 :     pub fn into_inner(self) -> W {
     113        16620 :         self.inner
     114        16620 :     }
     115              : }
     116              : 
     117              : impl<W> WriteBlobWriter<W>
     118              : where
     119              :     W: std::io::Write,
     120              : {
     121              :     /// Write a blob of data. Returns the offset that it was written to,
     122              :     /// which can be used to retrieve the data later.
     123     91720901 :     pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
     124     91720901 :         let offset = self.offset;
     125     91720901 : 
     126     91720901 :         if srcbuf.len() < 128 {
     127              :             // Short blob. Write a 1-byte length header
     128     54391508 :             let len_buf = srcbuf.len() as u8;
     129     54391508 :             self.inner.write_all(&[len_buf])?;
     130     54391508 :             self.offset += 1;
     131              :         } else {
     132              :             // Write a 4-byte length header
     133     37329393 :             if srcbuf.len() > 0x7fff_ffff {
     134            0 :                 return Err(Error::new(
     135            0 :                     ErrorKind::Other,
     136            0 :                     format!("blob too large ({} bytes)", srcbuf.len()),
     137            0 :                 ));
     138     37329393 :             }
     139     37329393 :             let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
     140     37329393 :             len_buf[0] |= 0x80;
     141     37329393 :             self.inner.write_all(&len_buf)?;
     142     37329393 :             self.offset += 4;
     143              :         }
     144     91720901 :         self.inner.write_all(srcbuf)?;
     145     91720901 :         self.offset += srcbuf.len() as u64;
     146     91720901 :         Ok(offset)
     147     91720901 :     }
     148              : }
        

Generated by: LCOV version 2.1-beta