LCOV - code coverage report
Current view: top level - pageserver/src/tenant - block_io.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 86.4 % 110 95
Test Date: 2025-03-12 00:01:28 Functions: 82.6 % 23 19

            Line data    Source code
       1              : //!
       2              : //! Low-level Block-oriented I/O functions
       3              : //!
       4              : 
       5              : use std::ops::Deref;
       6              : 
       7              : use bytes::Bytes;
       8              : 
       9              : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
      10              : use crate::context::RequestContext;
      11              : use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
      12              : #[cfg(test)]
      13              : use crate::virtual_file::IoBufferMut;
      14              : use crate::virtual_file::VirtualFile;
      15              : 
      16              : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
      17              : /// blocks, using the page cache
      18              : ///
      19              : /// There are currently two implementations: EphemeralFile, and FileBlockReader
      20              : /// below.
      21              : pub trait BlockReader {
      22              :     ///
      23              :     /// Create a new "cursor" for reading from this reader.
      24              :     ///
      25              :     /// A cursor caches the last accessed page, allowing for faster
      26              :     /// access if the same block is accessed repeatedly.
      27              :     fn block_cursor(&self) -> BlockCursor<'_>;
      28              : }
      29              : 
      30              : impl<B> BlockReader for &B
      31              : where
      32              :     B: BlockReader,
      33              : {
      34           40 :     fn block_cursor(&self) -> BlockCursor<'_> {
      35           40 :         (*self).block_cursor()
      36           40 :     }
      37              : }
      38              : 
      39              : /// Reference to an in-memory copy of an immutable on-disk block.
      40              : pub enum BlockLease<'a> {
      41              :     PageReadGuard(PageReadGuard<'static>),
      42              :     EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
      43              :     Slice(&'a [u8; PAGE_SZ]),
      44              :     #[cfg(test)]
      45              :     Arc(std::sync::Arc<[u8; PAGE_SZ]>),
      46              :     #[cfg(test)]
      47              :     IoBufferMut(IoBufferMut),
      48              : }
      49              : 
      50              : impl From<PageReadGuard<'static>> for BlockLease<'static> {
      51       974273 :     fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
      52       974273 :         BlockLease::PageReadGuard(value)
      53       974273 :     }
      54              : }
      55              : 
      56              : #[cfg(test)]
      57              : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
      58      2033394 :     fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
      59      2033394 :         BlockLease::Arc(value)
      60      2033394 :     }
      61              : }
      62              : 
      63              : impl Deref for BlockLease<'_> {
      64              :     type Target = [u8; PAGE_SZ];
      65              : 
      66      3078411 :     fn deref(&self) -> &Self::Target {
      67      3078411 :         match self {
      68       974433 :             BlockLease::PageReadGuard(v) => v.deref(),
      69            0 :             BlockLease::EphemeralFileMutableTail(v) => v,
      70            0 :             BlockLease::Slice(v) => v,
      71              :             #[cfg(test)]
      72      2033394 :             BlockLease::Arc(v) => v.deref(),
      73              :             #[cfg(test)]
      74        70584 :             BlockLease::IoBufferMut(v) => {
      75        70584 :                 TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
      76              :             }
      77              :         }
      78      3078411 :     }
      79              : }
      80              : 
      81              : /// Provides the ability to read blocks from different sources,
      82              : /// similar to using traits for this purpose.
      83              : ///
      84              : /// Unlike traits, we also support the read function to be async though.
      85              : pub(crate) enum BlockReaderRef<'a> {
      86              :     FileBlockReader(&'a FileBlockReader<'a>),
      87              :     Adapter(Adapter<&'a DeltaLayerInner>),
      88              :     #[cfg(test)]
      89              :     TestDisk(&'a super::disk_btree::tests::TestDisk),
      90              :     #[cfg(test)]
      91              :     VirtualFile(&'a VirtualFile),
      92              : }
      93              : 
      94              : impl BlockReaderRef<'_> {
      95              :     #[inline(always)]
      96      3047035 :     async fn read_blk(
      97      3047035 :         &self,
      98      3047035 :         blknum: u32,
      99      3047035 :         ctx: &RequestContext,
     100      3047035 :     ) -> Result<BlockLease, std::io::Error> {
     101              :         use BlockReaderRef::*;
     102      3047035 :         match self {
     103       971793 :             FileBlockReader(r) => r.read_blk(blknum, ctx).await,
     104           16 :             Adapter(r) => r.read_blk(blknum, ctx).await,
     105              :             #[cfg(test)]
     106      2033394 :             TestDisk(r) => r.read_blk(blknum),
     107              :             #[cfg(test)]
     108        41832 :             VirtualFile(r) => r.read_blk(blknum, ctx).await,
     109              :         }
     110      3047035 :     }
     111              : }
     112              : 
     113              : ///
     114              : /// A "cursor" for efficiently reading multiple pages from a BlockReader
     115              : ///
     116              : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
     117              : /// that in many cases you can use a BlockCursor as a drop-in replacement for
     118              : /// the underlying BlockReader. For example:
     119              : ///
     120              : /// ```no_run
     121              : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
     122              : /// # use pageserver::context::RequestContext;
     123              : /// # let reader: FileBlockReader = unimplemented!("stub");
     124              : /// # let ctx: RequestContext = unimplemented!("stub");
     125              : /// let cursor = reader.block_cursor();
     126              : /// let buf = cursor.read_blk(1, &ctx);
     127              : /// // do stuff with 'buf'
     128              : /// let buf = cursor.read_blk(2, &ctx);
     129              : /// // do stuff with 'buf'
     130              : /// ```
     131              : ///
     132              : pub struct BlockCursor<'a> {
     133              :     pub(super) read_compressed: bool,
     134              :     reader: BlockReaderRef<'a>,
     135              : }
     136              : 
     137              : impl<'a> BlockCursor<'a> {
     138       822392 :     pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
     139       822392 :         Self::new_with_compression(reader, false)
     140       822392 :     }
     141      1306126 :     pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
     142      1306126 :         BlockCursor {
     143      1306126 :             read_compressed,
     144      1306126 :             reader,
     145      1306126 :         }
     146      1306126 :     }
     147              :     // Needed by cli
     148            0 :     pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
     149            0 :         BlockCursor {
     150            0 :             read_compressed: false,
     151            0 :             reader: BlockReaderRef::FileBlockReader(reader),
     152            0 :         }
     153            0 :     }
     154              : 
     155              :     /// Read a block.
     156              :     ///
     157              :     /// Returns a "lease" object that can be used to
     158              :     /// access to the contents of the page. (For the page cache, the
     159              :     /// lease object represents a lock on the buffer.)
     160              :     #[inline(always)]
     161      3047035 :     pub async fn read_blk(
     162      3047035 :         &self,
     163      3047035 :         blknum: u32,
     164      3047035 :         ctx: &RequestContext,
     165      3047035 :     ) -> Result<BlockLease, std::io::Error> {
     166      3047035 :         self.reader.read_blk(blknum, ctx).await
     167      3047035 :     }
     168              : }
     169              : 
     170              : /// An adapter for reading a (virtual) file using the page cache.
     171              : ///
     172              : /// The file is assumed to be immutable. This doesn't provide any functions
     173              : /// for modifying the file, nor for invalidating the cache if it is modified.
     174              : #[derive(Clone)]
     175              : pub struct FileBlockReader<'a> {
     176              :     pub file: &'a VirtualFile,
     177              : 
     178              :     /// Unique ID of this file, used as key in the page cache.
     179              :     file_id: page_cache::FileId,
     180              : 
     181              :     compressed_reads: bool,
     182              : }
     183              : 
     184              : impl<'a> FileBlockReader<'a> {
     185       485538 :     pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
     186       485538 :         FileBlockReader {
     187       485538 :             file_id,
     188       485538 :             file,
     189       485538 :             compressed_reads: true,
     190       485538 :         }
     191       485538 :     }
     192              : 
     193              :     /// Read a page from the underlying file into given buffer.
     194        63468 :     async fn fill_buffer(
     195        63468 :         &self,
     196        63468 :         buf: PageWriteGuard<'static>,
     197        63468 :         blkno: u32,
     198        63468 :         ctx: &RequestContext,
     199        63468 :     ) -> Result<PageWriteGuard<'static>, std::io::Error> {
     200        63468 :         assert!(buf.len() == PAGE_SZ);
     201        63468 :         self.file
     202        63468 :             .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
     203        63468 :             .await
     204        63468 :     }
     205              :     /// Read a block.
     206              :     ///
     207              :     /// Returns a "lease" object that can be used to
     208              :     /// access to the contents of the page. (For the page cache, the
     209              :     /// lease object represents a lock on the buffer.)
     210       974273 :     pub async fn read_blk<'b>(
     211       974273 :         &self,
     212       974273 :         blknum: u32,
     213       974273 :         ctx: &RequestContext,
     214       974273 :     ) -> Result<BlockLease<'b>, std::io::Error> {
     215       974273 :         let cache = page_cache::get();
     216       974273 :         match cache
     217       974273 :             .read_immutable_buf(self.file_id, blknum, ctx)
     218       974273 :             .await
     219       974273 :             .map_err(|e| {
     220            0 :                 std::io::Error::new(
     221            0 :                     std::io::ErrorKind::Other,
     222            0 :                     format!("Failed to read immutable buf: {e:#}"),
     223            0 :                 )
     224       974273 :             })? {
     225       910805 :             ReadBufResult::Found(guard) => Ok(guard.into()),
     226        63468 :             ReadBufResult::NotFound(write_guard) => {
     227              :                 // Read the page from disk into the buffer
     228        63468 :                 let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
     229        63468 :                 Ok(write_guard.mark_valid().into())
     230              :             }
     231              :         }
     232       974273 :     }
     233              : }
     234              : 
     235              : impl BlockReader for FileBlockReader<'_> {
     236       483670 :     fn block_cursor(&self) -> BlockCursor<'_> {
     237       483670 :         BlockCursor::new_with_compression(
     238       483670 :             BlockReaderRef::FileBlockReader(self),
     239       483670 :             self.compressed_reads,
     240       483670 :         )
     241       483670 :     }
     242              : }
     243              : 
     244              : ///
     245              : /// Trait for block-oriented output
     246              : ///
     247              : pub trait BlockWriter {
     248              :     ///
     249              :     /// Write a page to the underlying storage.
     250              :     ///
     251              :     /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
     252              :     /// written to.
     253              :     ///
     254              :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
     255              : }
     256              : 
     257              : ///
     258              : /// A simple in-memory buffer of blocks.
     259              : ///
     260              : pub struct BlockBuf {
     261              :     pub blocks: Vec<Bytes>,
     262              : }
     263              : impl BlockWriter for BlockBuf {
     264        28960 :     fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
     265        28960 :         assert!(buf.len() == PAGE_SZ);
     266        28960 :         let blknum = self.blocks.len();
     267        28960 :         self.blocks.push(buf);
     268        28960 :         Ok(blknum as u32)
     269        28960 :     }
     270              : }
     271              : 
     272              : impl BlockBuf {
     273         4140 :     pub fn new() -> Self {
     274         4140 :         BlockBuf { blocks: Vec::new() }
     275         4140 :     }
     276              : 
     277      4095304 :     pub fn size(&self) -> u64 {
     278      4095304 :         (self.blocks.len() * PAGE_SZ) as u64
     279      4095304 :     }
     280              : }
     281              : impl Default for BlockBuf {
     282            0 :     fn default() -> Self {
     283            0 :         Self::new()
     284            0 :     }
     285              : }
        

Generated by: LCOV version 2.1-beta