LCOV - code coverage report
Current view: top level - pageserver/src/tenant - block_io.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 84.3 % 83 70
Test Date: 2023-09-06 10:18:01 Functions: 85.7 % 21 18

            Line data    Source code
       1              : //!
       2              : //! Low-level Block-oriented I/O functions
       3              : //!
       4              : 
       5              : use super::ephemeral_file::EphemeralFile;
       6              : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
       7              : use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
       8              : use crate::virtual_file::VirtualFile;
       9              : use bytes::Bytes;
      10              : use std::ops::{Deref, DerefMut};
      11              : 
      12              : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
      13              : /// blocks, using the page cache
      14              : ///
      15              : /// There are currently two implementations: EphemeralFile, and FileBlockReader
      16              : /// below.
      17              : pub trait BlockReader {
      18              :     ///
      19              :     /// Create a new "cursor" for reading from this reader.
      20              :     ///
      21              :     /// A cursor caches the last accessed page, allowing for faster
      22              :     /// access if the same block is accessed repeatedly.
      23              :     fn block_cursor(&self) -> BlockCursor<'_>;
      24              : }
      25              : 
      26              : impl<B> BlockReader for &B
      27              : where
      28              :     B: BlockReader,
      29              : {
      30     45700309 :     fn block_cursor(&self) -> BlockCursor<'_> {
      31     45700309 :         (*self).block_cursor()
      32     45700309 :     }
      33              : }
      34              : 
      35              : /// Reference to an in-memory copy of an immutable on-disk block.
      36              : pub enum BlockLease<'a> {
      37              :     PageReadGuard(PageReadGuard<'static>),
      38              :     EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
      39              :     #[cfg(test)]
      40              :     Arc(std::sync::Arc<[u8; PAGE_SZ]>),
      41              : }
      42              : 
      43              : impl From<PageReadGuard<'static>> for BlockLease<'static> {
      44    291337465 :     fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
      45    291337465 :         BlockLease::PageReadGuard(value)
      46    291337465 :     }
      47              : }
      48              : 
      49              : #[cfg(test)]
      50              : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
      51       508211 :     fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
      52       508211 :         BlockLease::Arc(value)
      53       508211 :     }
      54              : }
      55              : 
      56              : impl<'a> Deref for BlockLease<'a> {
      57              :     type Target = [u8; PAGE_SZ];
      58              : 
      59    791982016 :     fn deref(&self) -> &Self::Target {
      60    791982016 :         match self {
      61    784687836 :             BlockLease::PageReadGuard(v) => v.deref(),
      62      6785969 :             BlockLease::EphemeralFileMutableTail(v) => v,
      63              :             #[cfg(test)]
      64       508211 :             BlockLease::Arc(v) => v.deref(),
      65              :         }
      66    791982016 :     }
      67              : }
      68              : 
      69              : /// Provides the ability to read blocks from different sources,
      70              : /// similar to using traits for this purpose.
      71              : ///
      72              : /// Unlike traits, we also support the read function to be async though.
      73              : pub(crate) enum BlockReaderRef<'a> {
      74              :     FileBlockReaderVirtual(&'a FileBlockReader),
      75              :     EphemeralFile(&'a EphemeralFile),
      76              :     Adapter(Adapter<&'a DeltaLayerInner>),
      77              :     #[cfg(test)]
      78              :     TestDisk(&'a super::disk_btree::tests::TestDisk),
      79              : }
      80              : 
      81              : impl<'a> BlockReaderRef<'a> {
      82              :     #[inline(always)]
      83    388596454 :     async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
      84    388596454 :         use BlockReaderRef::*;
      85    388596454 :         match self {
      86    257511401 :             FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
      87     96763021 :             EphemeralFile(r) => r.read_blk(blknum).await,
      88     33813821 :             Adapter(r) => r.read_blk(blknum).await,
      89              :             #[cfg(test)]
      90       508211 :             TestDisk(r) => r.read_blk(blknum),
      91              :         }
      92    388596452 :     }
      93              : }
      94              : 
      95              : ///
      96              : /// A "cursor" for efficiently reading multiple pages from a BlockReader
      97              : ///
      98              : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
      99              : /// that in many cases you can use a BlockCursor as a drop-in replacement for
     100              : /// the underlying BlockReader. For example:
     101              : ///
     102              : /// ```no_run
     103              : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
     104              : /// # let reader: FileBlockReader = unimplemented!("stub");
     105              : /// let cursor = reader.block_cursor();
     106              : /// let buf = cursor.read_blk(1);
     107              : /// // do stuff with 'buf'
     108              : /// let buf = cursor.read_blk(2);
     109              : /// // do stuff with 'buf'
     110              : /// ```
     111              : ///
     112              : pub struct BlockCursor<'a> {
     113              :     reader: BlockReaderRef<'a>,
     114              : }
     115              : 
     116              : impl<'a> BlockCursor<'a> {
     117    144311348 :     pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
     118    144311348 :         BlockCursor { reader }
     119    144311348 :     }
     120              :     // Needed by cli
     121            0 :     pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
     122            0 :         BlockCursor {
     123            0 :             reader: BlockReaderRef::FileBlockReaderVirtual(reader),
     124            0 :         }
     125            0 :     }
     126              : 
     127              :     /// Read a block.
     128              :     ///
     129              :     /// Returns a "lease" object that can be used to
     130              :     /// access to the contents of the page. (For the page cache, the
     131              :     /// lease object represents a lock on the buffer.)
     132              :     #[inline(always)]
     133    388596337 :     pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
     134    388596454 :         self.reader.read_blk(blknum).await
     135    388596452 :     }
     136              : }
     137              : 
     138              : /// An adapter for reading a (virtual) file using the page cache.
     139              : ///
     140              : /// The file is assumed to be immutable. This doesn't provide any functions
     141              : /// for modifying the file, nor for invalidating the cache if it is modified.
     142              : pub struct FileBlockReader {
     143              :     pub file: VirtualFile,
     144              : 
     145              :     /// Unique ID of this file, used as key in the page cache.
     146              :     file_id: page_cache::FileId,
     147              : }
     148              : 
     149              : impl FileBlockReader {
     150        12140 :     pub fn new(file: VirtualFile) -> Self {
     151        12140 :         let file_id = page_cache::next_file_id();
     152        12140 : 
     153        12140 :         FileBlockReader { file_id, file }
     154        12140 :     }
     155              : 
     156              :     /// Read a page from the underlying file into given buffer.
     157      5359546 :     async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
     158      5359540 :         assert!(buf.len() == PAGE_SZ);
     159      5359540 :         self.file
     160      5359540 :             .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
     161            0 :             .await
     162      5359540 :     }
     163              :     /// Read a block.
     164              :     ///
     165              :     /// Returns a "lease" object that can be used to
     166              :     /// access to the contents of the page. (For the page cache, the
     167              :     /// lease object represents a lock on the buffer.)
     168    291337479 :     pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
     169    291337362 :         let cache = page_cache::get();
     170              :         loop {
     171    296696891 :             match cache
     172    296696891 :                 .read_immutable_buf(self.file_id, blknum)
     173      1640566 :                 .await
     174    296696890 :                 .map_err(|e| {
     175            0 :                     std::io::Error::new(
     176            0 :                         std::io::ErrorKind::Other,
     177            0 :                         format!("Failed to read immutable buf: {e:#}"),
     178            0 :                     )
     179    296696890 :                 })? {
     180    291337350 :                 ReadBufResult::Found(guard) => break Ok(guard.into()),
     181      5359540 :                 ReadBufResult::NotFound(mut write_guard) => {
     182      5359540 :                     // Read the page from disk into the buffer
     183      5359540 :                     self.fill_buffer(write_guard.deref_mut(), blknum).await?;
     184      5359529 :                     write_guard.mark_valid();
     185      5359529 : 
     186      5359529 :                     // Swap for read lock
     187      5359529 :                     continue;
     188              :                 }
     189              :             };
     190              :         }
     191    291337361 :     }
     192              : }
     193              : 
     194              : impl BlockReader for FileBlockReader {
     195     91395711 :     fn block_cursor(&self) -> BlockCursor<'_> {
     196     91395711 :         BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))
     197     91395711 :     }
     198              : }
     199              : 
     200              : ///
     201              : /// Trait for block-oriented output
     202              : ///
     203              : pub trait BlockWriter {
     204              :     ///
     205              :     /// Write a page to the underlying storage.
     206              :     ///
     207              :     /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
     208              :     /// written to.
     209              :     ///
     210              :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
     211              : }
     212              : 
     213              : ///
     214              : /// A simple in-memory buffer of blocks.
     215              : ///
     216              : pub struct BlockBuf {
     217              :     pub blocks: Vec<Bytes>,
     218              : }
     219              : impl BlockWriter for BlockBuf {
     220       193641 :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
     221       193641 :         assert!(buf.len() == PAGE_SZ);
     222       193641 :         let blknum = self.blocks.len();
     223       193641 :         self.blocks.push(buf);
     224       193641 :         Ok(blknum as u32)
     225       193641 :     }
     226              : }
     227              : 
     228              : impl BlockBuf {
     229        16627 :     pub fn new() -> Self {
     230        16627 :         BlockBuf { blocks: Vec::new() }
     231        16627 :     }
     232              : 
     233      1751919 :     pub fn size(&self) -> u64 {
     234      1751919 :         (self.blocks.len() * PAGE_SZ) as u64
     235      1751919 :     }
     236              : }
     237              : impl Default for BlockBuf {
     238            0 :     fn default() -> Self {
     239            0 :         Self::new()
     240            0 :     }
     241              : }
        

Generated by: LCOV version 2.1-beta