LCOV - code coverage report
Current view: top level - pageserver/src/tenant - block_io.rs (source / functions) Coverage Total Hit
Test: 36a768e0f9bed31ef74078bc2bec5bd5390d3ce6.info Lines: 77.8 % 135 105
Test Date: 2024-07-09 16:10:09 Functions: 83.3 % 24 20

            Line data    Source code
       1              : //!
       2              : //! Low-level Block-oriented I/O functions
       3              : //!
       4              : 
       5              : use super::ephemeral_file::EphemeralFile;
       6              : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
       7              : use crate::context::RequestContext;
       8              : use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
       9              : use crate::virtual_file::VirtualFile;
      10              : use bytes::Bytes;
      11              : use std::ops::Deref;
      12              : 
      13              : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
      14              : /// blocks, using the page cache
      15              : ///
      16              : /// There are currently two implementations: EphemeralFile, and FileBlockReader
      17              : /// below.
      18              : pub trait BlockReader {
      19              :     ///
      20              :     /// Create a new "cursor" for reading from this reader.
      21              :     ///
      22              :     /// A cursor caches the last accessed page, allowing for faster
      23              :     /// access if the same block is accessed repeatedly.
      24              :     fn block_cursor(&self) -> BlockCursor<'_>;
      25              : }
      26              : 
      27              : impl<B> BlockReader for &B
      28              : where
      29              :     B: BlockReader,
      30              : {
      31       210798 :     fn block_cursor(&self) -> BlockCursor<'_> {
      32       210798 :         (*self).block_cursor()
      33       210798 :     }
      34              : }
      35              : 
      36              : /// Reference to an in-memory copy of an immutable on-disk block.
      37              : pub enum BlockLease<'a> {
      38              :     PageReadGuard(PageReadGuard<'static>),
      39              :     EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
      40              :     Slice(&'a [u8; PAGE_SZ]),
      41              :     #[cfg(test)]
      42              :     Arc(std::sync::Arc<[u8; PAGE_SZ]>),
      43              :     #[cfg(test)]
      44              :     Vec(Vec<u8>),
      45              : }
      46              : 
      47              : impl From<PageReadGuard<'static>> for BlockLease<'static> {
      48      2739717 :     fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
      49      2739717 :         BlockLease::PageReadGuard(value)
      50      2739717 :     }
      51              : }
      52              : 
      53              : #[cfg(test)]
      54              : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
      55      1016766 :     fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
      56      1016766 :         BlockLease::Arc(value)
      57      1016766 :     }
      58              : }
      59              : 
      60              : impl<'a> Deref for BlockLease<'a> {
      61              :     type Target = [u8; PAGE_SZ];
      62              : 
      63     15853472 :     fn deref(&self) -> &Self::Target {
      64     15853472 :         match self {
      65     13498801 :             BlockLease::PageReadGuard(v) => v.deref(),
      66      1302613 :             BlockLease::EphemeralFileMutableTail(v) => v,
      67            0 :             BlockLease::Slice(v) => v,
      68              :             #[cfg(test)]
      69      1016766 :             BlockLease::Arc(v) => v.deref(),
      70              :             #[cfg(test)]
      71        35292 :             BlockLease::Vec(v) => {
      72        35292 :                 TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
      73              :             }
      74              :         }
      75     15853472 :     }
      76              : }
      77              : 
      78              : /// Provides the ability to read blocks from different sources,
      79              : /// similar to using traits for this purpose.
      80              : ///
      81              : /// Unlike traits, we also support the read function to be async though.
      82              : pub(crate) enum BlockReaderRef<'a> {
      83              :     FileBlockReader(&'a FileBlockReader<'a>),
      84              :     EphemeralFile(&'a EphemeralFile),
      85              :     Adapter(Adapter<&'a DeltaLayerInner>),
      86              :     Slice(&'a [u8]),
      87              :     #[cfg(test)]
      88              :     TestDisk(&'a super::disk_btree::tests::TestDisk),
      89              :     #[cfg(test)]
      90              :     VirtualFile(&'a VirtualFile),
      91              : }
      92              : 
      93              : impl<'a> BlockReaderRef<'a> {
      94              :     #[inline(always)]
      95      8732158 :     async fn read_blk(
      96      8732158 :         &self,
      97      8732158 :         blknum: u32,
      98      8732158 :         ctx: &RequestContext,
      99      8732158 :     ) -> Result<BlockLease, std::io::Error> {
     100      8732158 :         use BlockReaderRef::*;
     101      8732158 :         match self {
     102       655625 :             FileBlockReader(r) => r.read_blk(blknum, ctx).await,
     103      4955849 :             EphemeralFile(r) => r.read_blk(blknum, ctx).await,
     104      2083002 :             Adapter(r) => r.read_blk(blknum, ctx).await,
     105            0 :             Slice(s) => Self::read_blk_slice(s, blknum),
     106              :             #[cfg(test)]
     107      1016766 :             TestDisk(r) => r.read_blk(blknum),
     108              :             #[cfg(test)]
     109        20916 :             VirtualFile(r) => r.read_blk(blknum, ctx).await,
     110              :         }
     111      8732158 :     }
     112              : }
     113              : 
     114              : impl<'a> BlockReaderRef<'a> {
     115            0 :     fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
     116            0 :         let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
     117            0 :         let end = start.checked_add(PAGE_SZ).unwrap();
     118            0 :         if end > slice.len() {
     119            0 :             return Err(std::io::Error::new(
     120            0 :                 std::io::ErrorKind::UnexpectedEof,
     121            0 :                 format!("slice too short, len={} end={}", slice.len(), end),
     122            0 :             ));
     123            0 :         }
     124            0 :         let slice = &slice[start..end];
     125            0 :         let page_sized: &[u8; PAGE_SZ] = slice
     126            0 :             .try_into()
     127            0 :             .expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
     128            0 :         Ok(BlockLease::Slice(page_sized))
     129            0 :     }
     130              : }
     131              : 
     132              : ///
     133              : /// A "cursor" for efficiently reading multiple pages from a BlockReader
     134              : ///
     135              : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
     136              : /// that in many cases you can use a BlockCursor as a drop-in replacement for
     137              : /// the underlying BlockReader. For example:
     138              : ///
     139              : /// ```no_run
     140              : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
     141              : /// # use pageserver::context::RequestContext;
     142              : /// # let reader: FileBlockReader = unimplemented!("stub");
     143              : /// # let ctx: RequestContext = unimplemented!("stub");
     144              : /// let cursor = reader.block_cursor();
     145              : /// let buf = cursor.read_blk(1, &ctx);
     146              : /// // do stuff with 'buf'
     147              : /// let buf = cursor.read_blk(2, &ctx);
     148              : /// // do stuff with 'buf'
     149              : /// ```
     150              : ///
     151              : pub struct BlockCursor<'a> {
     152              :     pub(super) read_compressed: bool,
     153              :     reader: BlockReaderRef<'a>,
     154              : }
     155              : 
     156              : impl<'a> BlockCursor<'a> {
     157      3082456 :     pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
     158      3082456 :         Self::new_with_compression(reader, false)
     159      3082456 :     }
     160      3569064 :     pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
     161      3569064 :         BlockCursor {
     162      3569064 :             read_compressed,
     163      3569064 :             reader,
     164      3569064 :         }
     165      3569064 :     }
     166              :     // Needed by cli
     167            0 :     pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
     168            0 :         BlockCursor {
     169            0 :             read_compressed: false,
     170            0 :             reader: BlockReaderRef::FileBlockReader(reader),
     171            0 :         }
     172            0 :     }
     173              : 
     174              :     /// Read a block.
     175              :     ///
     176              :     /// Returns a "lease" object that can be used to
     177              :     /// access to the contents of the page. (For the page cache, the
     178              :     /// lease object represents a lock on the buffer.)
     179              :     #[inline(always)]
     180      8732158 :     pub async fn read_blk(
     181      8732158 :         &self,
     182      8732158 :         blknum: u32,
     183      8732158 :         ctx: &RequestContext,
     184      8732158 :     ) -> Result<BlockLease, std::io::Error> {
     185      8732158 :         self.reader.read_blk(blknum, ctx).await
     186      8732158 :     }
     187              : }
     188              : 
     189              : /// An adapter for reading a (virtual) file using the page cache.
     190              : ///
     191              : /// The file is assumed to be immutable. This doesn't provide any functions
     192              : /// for modifying the file, nor for invalidating the cache if it is modified.
     193              : #[derive(Clone)]
     194              : pub struct FileBlockReader<'a> {
     195              :     pub file: &'a VirtualFile,
     196              : 
     197              :     /// Unique ID of this file, used as key in the page cache.
     198              :     file_id: page_cache::FileId,
     199              : 
     200              :     compressed_reads: bool,
     201              : }
     202              : 
     203              : impl<'a> FileBlockReader<'a> {
     204      2288625 :     pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
     205      2288625 :         Self::new_with_compression(file, file_id, false)
     206      2288625 :     }
     207              : 
     208      2295880 :     pub fn new_with_compression(
     209      2295880 :         file: &'a VirtualFile,
     210      2295880 :         file_id: FileId,
     211      2295880 :         compressed_reads: bool,
     212      2295880 :     ) -> Self {
     213      2295880 :         FileBlockReader {
     214      2295880 :             file_id,
     215      2295880 :             file,
     216      2295880 :             compressed_reads,
     217      2295880 :         }
     218      2295880 :     }
     219              : 
     220              :     /// Read a page from the underlying file into given buffer.
     221        60136 :     async fn fill_buffer(
     222        60136 :         &self,
     223        60136 :         buf: PageWriteGuard<'static>,
     224        60136 :         blkno: u32,
     225        60136 :         ctx: &RequestContext,
     226        60136 :     ) -> Result<PageWriteGuard<'static>, std::io::Error> {
     227        60136 :         assert!(buf.len() == PAGE_SZ);
     228        60136 :         self.file
     229        60136 :             .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
     230        36353 :             .await
     231        60136 :     }
     232              :     /// Read a block.
     233              :     ///
     234              :     /// Returns a "lease" object that can be used to
     235              :     /// access to the contents of the page. (For the page cache, the
     236              :     /// lease object represents a lock on the buffer.)
     237      2739717 :     pub async fn read_blk<'b>(
     238      2739717 :         &self,
     239      2739717 :         blknum: u32,
     240      2739717 :         ctx: &RequestContext,
     241      2739717 :     ) -> Result<BlockLease<'b>, std::io::Error> {
     242      2739717 :         let cache = page_cache::get();
     243      2739717 :         match cache
     244      2739717 :             .read_immutable_buf(self.file_id, blknum, ctx)
     245        33971 :             .await
     246      2739717 :             .map_err(|e| {
     247            0 :                 std::io::Error::new(
     248            0 :                     std::io::ErrorKind::Other,
     249            0 :                     format!("Failed to read immutable buf: {e:#}"),
     250            0 :                 )
     251      2739717 :             })? {
     252      2679581 :             ReadBufResult::Found(guard) => Ok(guard.into()),
     253        60136 :             ReadBufResult::NotFound(write_guard) => {
     254              :                 // Read the page from disk into the buffer
     255        60136 :                 let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
     256        60136 :                 Ok(write_guard.mark_valid().into())
     257              :             }
     258              :         }
     259      2739717 :     }
     260              : }
     261              : 
     262              : impl BlockReader for FileBlockReader<'_> {
     263       486576 :     fn block_cursor(&self) -> BlockCursor<'_> {
     264       486576 :         BlockCursor::new_with_compression(
     265       486576 :             BlockReaderRef::FileBlockReader(self),
     266       486576 :             self.compressed_reads,
     267       486576 :         )
     268       486576 :     }
     269              : }
     270              : 
     271              : ///
     272              : /// Trait for block-oriented output
     273              : ///
     274              : pub trait BlockWriter {
     275              :     ///
     276              :     /// Write a page to the underlying storage.
     277              :     ///
     278              :     /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
     279              :     /// written to.
     280              :     ///
     281              :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
     282              : }
     283              : 
     284              : ///
     285              : /// A simple in-memory buffer of blocks.
     286              : ///
     287              : pub struct BlockBuf {
     288              :     pub blocks: Vec<Bytes>,
     289              : }
     290              : impl BlockWriter for BlockBuf {
     291        14223 :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
     292        14223 :         assert!(buf.len() == PAGE_SZ);
     293        14223 :         let blknum = self.blocks.len();
     294        14223 :         self.blocks.push(buf);
     295        14223 :         Ok(blknum as u32)
     296        14223 :     }
     297              : }
     298              : 
     299              : impl BlockBuf {
     300         1572 :     pub fn new() -> Self {
     301         1572 :         BlockBuf { blocks: Vec::new() }
     302         1572 :     }
     303              : 
     304      2023972 :     pub fn size(&self) -> u64 {
     305      2023972 :         (self.blocks.len() * PAGE_SZ) as u64
     306      2023972 :     }
     307              : }
     308              : impl Default for BlockBuf {
     309            0 :     fn default() -> Self {
     310            0 :         Self::new()
     311            0 :     }
     312              : }
        

Generated by: LCOV version 2.1-beta