LCOV - code coverage report
Current view: top level - pageserver/src/tenant - block_io.rs (source / functions) Coverage Total Hit
Test: 7eb96e224e685167ad85f58f858387d8cf253f63.info Lines: 86.4 % 110 95
Test Date: 2024-09-23 21:23:07 Functions: 82.6 % 23 19

            Line data    Source code
       1              : //!
       2              : //! Low-level Block-oriented I/O functions
       3              : //!
       4              : 
       5              : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
       6              : use crate::context::RequestContext;
       7              : use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
       8              : use crate::virtual_file::VirtualFile;
       9              : use bytes::Bytes;
      10              : use std::ops::Deref;
      11              : 
      12              : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
      13              : /// blocks, using the page cache
      14              : ///
      15              : /// There are currently two implementations: EphemeralFile, and FileBlockReader
      16              : /// below.
      17              : pub trait BlockReader {
      18              :     ///
      19              :     /// Create a new "cursor" for reading from this reader.
      20              :     ///
      21              :     /// A cursor caches the last accessed page, allowing for faster
      22              :     /// access if the same block is accessed repeatedly.
      23              :     fn block_cursor(&self) -> BlockCursor<'_>;
      24              : }
      25              : 
      26              : impl<B> BlockReader for &B
      27              : where
      28              :     B: BlockReader,
      29              : {
      30           60 :     fn block_cursor(&self) -> BlockCursor<'_> {
      31           60 :         (*self).block_cursor()
      32           60 :     }
      33              : }
      34              : 
      35              : /// Reference to an in-memory copy of an immutable on-disk block.
      36              : pub enum BlockLease<'a> {
      37              :     PageReadGuard(PageReadGuard<'static>),
      38              :     EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
      39              :     Slice(&'a [u8; PAGE_SZ]),
      40              :     #[cfg(test)]
      41              :     Arc(std::sync::Arc<[u8; PAGE_SZ]>),
      42              :     #[cfg(test)]
      43              :     Vec(Vec<u8>),
      44              : }
      45              : 
      46              : impl From<PageReadGuard<'static>> for BlockLease<'static> {
      47      1652993 :     fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
      48      1652993 :         BlockLease::PageReadGuard(value)
      49      1652993 :     }
      50              : }
      51              : 
      52              : #[cfg(test)]
      53              : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
      54      3050186 :     fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
      55      3050186 :         BlockLease::Arc(value)
      56      3050186 :     }
      57              : }
      58              : 
      59              : impl<'a> Deref for BlockLease<'a> {
      60              :     type Target = [u8; PAGE_SZ];
      61              : 
      62      4809295 :     fn deref(&self) -> &Self::Target {
      63      4809295 :         match self {
      64      1653233 :             BlockLease::PageReadGuard(v) => v.deref(),
      65            0 :             BlockLease::EphemeralFileMutableTail(v) => v,
      66            0 :             BlockLease::Slice(v) => v,
      67              :             #[cfg(test)]
      68      3050186 :             BlockLease::Arc(v) => v.deref(),
      69              :             #[cfg(test)]
      70       105876 :             BlockLease::Vec(v) => {
      71       105876 :                 TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
      72              :             }
      73              :         }
      74      4809295 :     }
      75              : }
      76              : 
      77              : /// Provides the ability to read blocks from different sources,
      78              : /// similar to using traits for this purpose.
      79              : ///
      80              : /// Unlike traits, we also support the read function to be async though.
      81              : pub(crate) enum BlockReaderRef<'a> {
      82              :     FileBlockReader(&'a FileBlockReader<'a>),
      83              :     Adapter(Adapter<&'a DeltaLayerInner>),
      84              :     #[cfg(test)]
      85              :     TestDisk(&'a super::disk_btree::tests::TestDisk),
      86              :     #[cfg(test)]
      87              :     VirtualFile(&'a VirtualFile),
      88              : }
      89              : 
      90              : impl<'a> BlockReaderRef<'a> {
      91              :     #[inline(always)]
      92      4762477 :     async fn read_blk(
      93      4762477 :         &self,
      94      4762477 :         blknum: u32,
      95      4762477 :         ctx: &RequestContext,
      96      4762477 :     ) -> Result<BlockLease, std::io::Error> {
      97              :         use BlockReaderRef::*;
      98      4762477 :         match self {
      99      1649519 :             FileBlockReader(r) => r.read_blk(blknum, ctx).await,
     100           24 :             Adapter(r) => r.read_blk(blknum, ctx).await,
     101              :             #[cfg(test)]
     102      3050186 :             TestDisk(r) => r.read_blk(blknum),
     103              :             #[cfg(test)]
     104        62748 :             VirtualFile(r) => r.read_blk(blknum, ctx).await,
     105              :         }
     106      4762477 :     }
     107              : }
     108              : 
     109              : ///
     110              : /// A "cursor" for efficiently reading multiple pages from a BlockReader
     111              : ///
     112              : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
     113              : /// that in many cases you can use a BlockCursor as a drop-in replacement for
     114              : /// the underlying BlockReader. For example:
     115              : ///
     116              : /// ```no_run
     117              : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
     118              : /// # use pageserver::context::RequestContext;
     119              : /// # let reader: FileBlockReader = unimplemented!("stub");
     120              : /// # let ctx: RequestContext = unimplemented!("stub");
     121              : /// let cursor = reader.block_cursor();
     122              : /// let buf = cursor.read_blk(1, &ctx);
     123              : /// // do stuff with 'buf'
     124              : /// let buf = cursor.read_blk(2, &ctx);
     125              : /// // do stuff with 'buf'
     126              : /// ```
     127              : ///
     128              : pub struct BlockCursor<'a> {
     129              :     pub(super) read_compressed: bool,
     130              :     reader: BlockReaderRef<'a>,
     131              : }
     132              : 
     133              : impl<'a> BlockCursor<'a> {
     134      1233634 :     pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
     135      1233634 :         Self::new_with_compression(reader, false)
     136      1233634 :     }
     137      2066275 :     pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
     138      2066275 :         BlockCursor {
     139      2066275 :             read_compressed,
     140      2066275 :             reader,
     141      2066275 :         }
     142      2066275 :     }
     143              :     // Needed by cli
     144            0 :     pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
     145            0 :         BlockCursor {
     146            0 :             read_compressed: false,
     147            0 :             reader: BlockReaderRef::FileBlockReader(reader),
     148            0 :         }
     149            0 :     }
     150              : 
     151              :     /// Read a block.
     152              :     ///
     153              :     /// Returns a "lease" object that can be used to
     154              :     /// access to the contents of the page. (For the page cache, the
     155              :     /// lease object represents a lock on the buffer.)
     156              :     #[inline(always)]
     157      4762477 :     pub async fn read_blk(
     158      4762477 :         &self,
     159      4762477 :         blknum: u32,
     160      4762477 :         ctx: &RequestContext,
     161      4762477 :     ) -> Result<BlockLease, std::io::Error> {
     162      4762477 :         self.reader.read_blk(blknum, ctx).await
     163      4762477 :     }
     164              : }
     165              : 
     166              : /// An adapter for reading a (virtual) file using the page cache.
     167              : ///
     168              : /// The file is assumed to be immutable. This doesn't provide any functions
     169              : /// for modifying the file, nor for invalidating the cache if it is modified.
     170              : #[derive(Clone)]
     171              : pub struct FileBlockReader<'a> {
     172              :     pub file: &'a VirtualFile,
     173              : 
     174              :     /// Unique ID of this file, used as key in the page cache.
     175              :     file_id: page_cache::FileId,
     176              : 
     177              :     compressed_reads: bool,
     178              : }
     179              : 
     180              : impl<'a> FileBlockReader<'a> {
     181       644233 :     pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
     182       644233 :         FileBlockReader {
     183       644233 :             file_id,
     184       644233 :             file,
     185       644233 :             compressed_reads: true,
     186       644233 :         }
     187       644233 :     }
     188              : 
     189              :     /// Read a page from the underlying file into given buffer.
     190        96482 :     async fn fill_buffer(
     191        96482 :         &self,
     192        96482 :         buf: PageWriteGuard<'static>,
     193        96482 :         blkno: u32,
     194        96482 :         ctx: &RequestContext,
     195        96482 :     ) -> Result<PageWriteGuard<'static>, std::io::Error> {
     196        96482 :         assert!(buf.len() == PAGE_SZ);
     197        96482 :         self.file
     198        96482 :             .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
     199        65822 :             .await
     200        96482 :     }
     201              :     /// Read a block.
     202              :     ///
     203              :     /// Returns a "lease" object that can be used to
     204              :     /// access to the contents of the page. (For the page cache, the
     205              :     /// lease object represents a lock on the buffer.)
     206      1652993 :     pub async fn read_blk<'b>(
     207      1652993 :         &self,
     208      1652993 :         blknum: u32,
     209      1652993 :         ctx: &RequestContext,
     210      1652993 :     ) -> Result<BlockLease<'b>, std::io::Error> {
     211      1652993 :         let cache = page_cache::get();
     212      1652993 :         match cache
     213      1652993 :             .read_immutable_buf(self.file_id, blknum, ctx)
     214        15047 :             .await
     215      1652993 :             .map_err(|e| {
     216            0 :                 std::io::Error::new(
     217            0 :                     std::io::ErrorKind::Other,
     218            0 :                     format!("Failed to read immutable buf: {e:#}"),
     219            0 :                 )
     220      1652993 :             })? {
     221      1556511 :             ReadBufResult::Found(guard) => Ok(guard.into()),
     222        96482 :             ReadBufResult::NotFound(write_guard) => {
     223              :                 // Read the page from disk into the buffer
     224        96482 :                 let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
     225        96482 :                 Ok(write_guard.mark_valid().into())
     226              :             }
     227              :         }
     228      1652993 :     }
     229              : }
     230              : 
     231              : impl BlockReader for FileBlockReader<'_> {
     232       832545 :     fn block_cursor(&self) -> BlockCursor<'_> {
     233       832545 :         BlockCursor::new_with_compression(
     234       832545 :             BlockReaderRef::FileBlockReader(self),
     235       832545 :             self.compressed_reads,
     236       832545 :         )
     237       832545 :     }
     238              : }
     239              : 
     240              : ///
     241              : /// Trait for block-oriented output
     242              : ///
     243              : pub trait BlockWriter {
     244              :     ///
     245              :     /// Write a page to the underlying storage.
     246              :     ///
     247              :     /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
     248              :     /// written to.
     249              :     ///
     250              :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
     251              : }
     252              : 
     253              : ///
     254              : /// A simple in-memory buffer of blocks.
     255              : ///
     256              : pub struct BlockBuf {
     257              :     pub blocks: Vec<Bytes>,
     258              : }
     259              : impl BlockWriter for BlockBuf {
     260        43151 :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
     261        43151 :         assert!(buf.len() == PAGE_SZ);
     262        43151 :         let blknum = self.blocks.len();
     263        43151 :         self.blocks.push(buf);
     264        43151 :         Ok(blknum as u32)
     265        43151 :     }
     266              : }
     267              : 
     268              : impl BlockBuf {
     269         5736 :     pub fn new() -> Self {
     270         5736 :         BlockBuf { blocks: Vec::new() }
     271         5736 :     }
     272              : 
     273      6142314 :     pub fn size(&self) -> u64 {
     274      6142314 :         (self.blocks.len() * PAGE_SZ) as u64
     275      6142314 :     }
     276              : }
     277              : impl Default for BlockBuf {
     278            0 :     fn default() -> Self {
     279            0 :         Self::new()
     280            0 :     }
     281              : }
        

Generated by: LCOV version 2.1-beta