LCOV - code coverage report
Current view: top level - pageserver/src/tenant - block_io.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 86.4 % 110 95
Test Date: 2024-10-22 22:13:45 Functions: 82.6 % 23 19

            Line data    Source code
       1              : //!
       2              : //! Low-level Block-oriented I/O functions
       3              : //!
       4              : 
       5              : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
       6              : use crate::context::RequestContext;
       7              : use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
       8              : #[cfg(test)]
       9              : use crate::virtual_file::IoBufferMut;
      10              : use crate::virtual_file::VirtualFile;
      11              : use bytes::Bytes;
      12              : use std::ops::Deref;
      13              : 
      14              : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
      15              : /// blocks, using the page cache
      16              : ///
      17              : /// There are currently two implementations: EphemeralFile, and FileBlockReader
      18              : /// below.
      19              : pub trait BlockReader {
      20              :     ///
      21              :     /// Create a new "cursor" for reading from this reader.
      22              :     ///
      23              :     /// A cursor caches the last accessed page, allowing for faster
      24              :     /// access if the same block is accessed repeatedly.
      25              :     fn block_cursor(&self) -> BlockCursor<'_>;
      26              : }
      27              : 
      28              : impl<B> BlockReader for &B
      29              : where
      30              :     B: BlockReader,
      31              : {
      32           20 :     fn block_cursor(&self) -> BlockCursor<'_> {
      33           20 :         (*self).block_cursor()
      34           20 :     }
      35              : }
      36              : 
      37              : /// Reference to an in-memory copy of an immutable on-disk block.
      38              : pub enum BlockLease<'a> {
      39              :     PageReadGuard(PageReadGuard<'static>),
      40              :     EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
      41              :     Slice(&'a [u8; PAGE_SZ]),
      42              :     #[cfg(test)]
      43              :     Arc(std::sync::Arc<[u8; PAGE_SZ]>),
      44              :     #[cfg(test)]
      45              :     IoBufferMut(IoBufferMut),
      46              : }
      47              : 
      48              : impl From<PageReadGuard<'static>> for BlockLease<'static> {
      49       550718 :     fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
      50       550718 :         BlockLease::PageReadGuard(value)
      51       550718 :     }
      52              : }
      53              : 
      54              : #[cfg(test)]
      55              : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
      56      1016630 :     fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
      57      1016630 :         BlockLease::Arc(value)
      58      1016630 :     }
      59              : }
      60              : 
      61              : impl Deref for BlockLease<'_> {
      62              :     type Target = [u8; PAGE_SZ];
      63              : 
      64      1602720 :     fn deref(&self) -> &Self::Target {
      65      1602720 :         match self {
      66       550798 :             BlockLease::PageReadGuard(v) => v.deref(),
      67            0 :             BlockLease::EphemeralFileMutableTail(v) => v,
      68            0 :             BlockLease::Slice(v) => v,
      69              :             #[cfg(test)]
      70      1016630 :             BlockLease::Arc(v) => v.deref(),
      71              :             #[cfg(test)]
      72        35292 :             BlockLease::IoBufferMut(v) => {
      73        35292 :                 TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
      74              :             }
      75              :         }
      76      1602720 :     }
      77              : }
      78              : 
      79              : /// Provides the ability to read blocks from different sources,
      80              : /// similar to using traits for this purpose.
      81              : ///
      82              : /// Unlike traits, we also support the read function to be async though.
      83              : pub(crate) enum BlockReaderRef<'a> {
      84              :     FileBlockReader(&'a FileBlockReader<'a>),
      85              :     Adapter(Adapter<&'a DeltaLayerInner>),
      86              :     #[cfg(test)]
      87              :     TestDisk(&'a super::disk_btree::tests::TestDisk),
      88              :     #[cfg(test)]
      89              :     VirtualFile(&'a VirtualFile),
      90              : }
      91              : 
      92              : impl<'a> BlockReaderRef<'a> {
      93              :     #[inline(always)]
      94      1587116 :     async fn read_blk(
      95      1587116 :         &self,
      96      1587116 :         blknum: u32,
      97      1587116 :         ctx: &RequestContext,
      98      1587116 :     ) -> Result<BlockLease, std::io::Error> {
      99              :         use BlockReaderRef::*;
     100      1587116 :         match self {
     101       549562 :             FileBlockReader(r) => r.read_blk(blknum, ctx).await,
     102            8 :             Adapter(r) => r.read_blk(blknum, ctx).await,
     103              :             #[cfg(test)]
     104      1016630 :             TestDisk(r) => r.read_blk(blknum),
     105              :             #[cfg(test)]
     106        20916 :             VirtualFile(r) => r.read_blk(blknum, ctx).await,
     107              :         }
     108      1587116 :     }
     109              : }
     110              : 
     111              : ///
     112              : /// A "cursor" for efficiently reading multiple pages from a BlockReader
     113              : ///
     114              : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
     115              : /// that in many cases you can use a BlockCursor as a drop-in replacement for
     116              : /// the underlying BlockReader. For example:
     117              : ///
     118              : /// ```no_run
     119              : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
     120              : /// # use pageserver::context::RequestContext;
     121              : /// # let reader: FileBlockReader = unimplemented!("stub");
     122              : /// # let ctx: RequestContext = unimplemented!("stub");
     123              : /// let cursor = reader.block_cursor();
     124              : /// let buf = cursor.read_blk(1, &ctx);
     125              : /// // do stuff with 'buf'
     126              : /// let buf = cursor.read_blk(2, &ctx);
     127              : /// // do stuff with 'buf'
     128              : /// ```
     129              : ///
     130              : pub struct BlockCursor<'a> {
     131              :     pub(super) read_compressed: bool,
     132              :     reader: BlockReaderRef<'a>,
     133              : }
     134              : 
     135              : impl<'a> BlockCursor<'a> {
     136       411162 :     pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
     137       411162 :         Self::new_with_compression(reader, false)
     138       411162 :     }
     139       688524 :     pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
     140       688524 :         BlockCursor {
     141       688524 :             read_compressed,
     142       688524 :             reader,
     143       688524 :         }
     144       688524 :     }
     145              :     // Needed by cli
     146            0 :     pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
     147            0 :         BlockCursor {
     148            0 :             read_compressed: false,
     149            0 :             reader: BlockReaderRef::FileBlockReader(reader),
     150            0 :         }
     151            0 :     }
     152              : 
     153              :     /// Read a block.
     154              :     ///
     155              :     /// Returns a "lease" object that can be used to
     156              :     /// access to the contents of the page. (For the page cache, the
     157              :     /// lease object represents a lock on the buffer.)
     158              :     #[inline(always)]
     159      1587116 :     pub async fn read_blk(
     160      1587116 :         &self,
     161      1587116 :         blknum: u32,
     162      1587116 :         ctx: &RequestContext,
     163      1587116 :     ) -> Result<BlockLease, std::io::Error> {
     164      1587116 :         self.reader.read_blk(blknum, ctx).await
     165      1587116 :     }
     166              : }
     167              : 
     168              : /// An adapter for reading a (virtual) file using the page cache.
     169              : ///
     170              : /// The file is assumed to be immutable. This doesn't provide any functions
     171              : /// for modifying the file, nor for invalidating the cache if it is modified.
     172              : #[derive(Clone)]
     173              : pub struct FileBlockReader<'a> {
     174              :     pub file: &'a VirtualFile,
     175              : 
     176              :     /// Unique ID of this file, used as key in the page cache.
     177              :     file_id: page_cache::FileId,
     178              : 
     179              :     compressed_reads: bool,
     180              : }
     181              : 
     182              : impl<'a> FileBlockReader<'a> {
     183       214491 :     pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
     184       214491 :         FileBlockReader {
     185       214491 :             file_id,
     186       214491 :             file,
     187       214491 :             compressed_reads: true,
     188       214491 :         }
     189       214491 :     }
     190              : 
     191              :     /// Read a page from the underlying file into given buffer.
     192        32007 :     async fn fill_buffer(
     193        32007 :         &self,
     194        32007 :         buf: PageWriteGuard<'static>,
     195        32007 :         blkno: u32,
     196        32007 :         ctx: &RequestContext,
     197        32007 :     ) -> Result<PageWriteGuard<'static>, std::io::Error> {
     198        32007 :         assert!(buf.len() == PAGE_SZ);
     199        32007 :         self.file
     200        32007 :             .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
     201        21751 :             .await
     202        32007 :     }
     203              :     /// Read a block.
     204              :     ///
     205              :     /// Returns a "lease" object that can be used to
     206              :     /// access to the contents of the page. (For the page cache, the
     207              :     /// lease object represents a lock on the buffer.)
     208       550718 :     pub async fn read_blk<'b>(
     209       550718 :         &self,
     210       550718 :         blknum: u32,
     211       550718 :         ctx: &RequestContext,
     212       550718 :     ) -> Result<BlockLease<'b>, std::io::Error> {
     213       550718 :         let cache = page_cache::get();
     214       550718 :         match cache
     215       550718 :             .read_immutable_buf(self.file_id, blknum, ctx)
     216         4928 :             .await
     217       550718 :             .map_err(|e| {
     218            0 :                 std::io::Error::new(
     219            0 :                     std::io::ErrorKind::Other,
     220            0 :                     format!("Failed to read immutable buf: {e:#}"),
     221            0 :                 )
     222       550718 :             })? {
     223       518711 :             ReadBufResult::Found(guard) => Ok(guard.into()),
     224        32007 :             ReadBufResult::NotFound(write_guard) => {
     225              :                 // Read the page from disk into the buffer
     226        32007 :                 let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
     227        32007 :                 Ok(write_guard.mark_valid().into())
     228              :             }
     229              :         }
     230       550718 :     }
     231              : }
     232              : 
     233              : impl BlockReader for FileBlockReader<'_> {
     234       277330 :     fn block_cursor(&self) -> BlockCursor<'_> {
     235       277330 :         BlockCursor::new_with_compression(
     236       277330 :             BlockReaderRef::FileBlockReader(self),
     237       277330 :             self.compressed_reads,
     238       277330 :         )
     239       277330 :     }
     240              : }
     241              : 
     242              : ///
     243              : /// Trait for block-oriented output
     244              : ///
     245              : pub trait BlockWriter {
     246              :     ///
     247              :     /// Write a page to the underlying storage.
     248              :     ///
     249              :     /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
     250              :     /// written to.
     251              :     ///
     252              :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
     253              : }
     254              : 
     255              : ///
     256              : /// A simple in-memory buffer of blocks.
     257              : ///
     258              : pub struct BlockBuf {
     259              :     pub blocks: Vec<Bytes>,
     260              : }
     261              : impl BlockWriter for BlockBuf {
     262        14360 :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
     263        14360 :         assert!(buf.len() == PAGE_SZ);
     264        14360 :         let blknum = self.blocks.len();
     265        14360 :         self.blocks.push(buf);
     266        14360 :         Ok(blknum as u32)
     267        14360 :     }
     268              : }
     269              : 
     270              : impl BlockBuf {
     271         1906 :     pub fn new() -> Self {
     272         1906 :         BlockBuf { blocks: Vec::new() }
     273         1906 :     }
     274              : 
     275      2047438 :     pub fn size(&self) -> u64 {
     276      2047438 :         (self.blocks.len() * PAGE_SZ) as u64
     277      2047438 :     }
     278              : }
     279              : impl Default for BlockBuf {
     280            0 :     fn default() -> Self {
     281            0 :         Self::new()
     282            0 :     }
     283              : }
        

Generated by: LCOV version 2.1-beta