LCOV - code coverage report
Current view: top level - pageserver/src/tenant - block_io.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 89.5 % 105 94
Test Date: 2025-04-24 20:31:15 Functions: 82.6 % 23 19

            Line data    Source code
       1              : //!
       2              : //! Low-level Block-oriented I/O functions
       3              : //!
       4              : 
       5              : use std::ops::Deref;
       6              : 
       7              : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
       8              : use crate::context::RequestContext;
       9              : use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
      10              : #[cfg(test)]
      11              : use crate::virtual_file::IoBufferMut;
      12              : use crate::virtual_file::{IoBuffer, VirtualFile};
      13              : 
      14              : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
      15              : /// blocks, using the page cache
      16              : ///
      17              : /// There are currently two implementations: EphemeralFile, and FileBlockReader
      18              : /// below.
      19              : pub trait BlockReader {
      20              :     ///
      21              :     /// Create a new "cursor" for reading from this reader.
      22              :     ///
      23              :     /// A cursor caches the last accessed page, allowing for faster
      24              :     /// access if the same block is accessed repeatedly.
      25              :     fn block_cursor(&self) -> BlockCursor<'_>;
      26              : }
      27              : 
      28              : impl<B> BlockReader for &B
      29              : where
      30              :     B: BlockReader,
      31              : {
      32          120 :     fn block_cursor(&self) -> BlockCursor<'_> {
      33          120 :         (*self).block_cursor()
      34          120 :     }
      35              : }
      36              : 
      37              : /// Reference to an in-memory copy of an immutable on-disk block.
      38              : pub enum BlockLease<'a> {
      39              :     PageReadGuard(PageReadGuard<'static>),
      40              :     EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
      41              :     Slice(&'a [u8; PAGE_SZ]),
      42              :     #[cfg(test)]
      43              :     Arc(std::sync::Arc<[u8; PAGE_SZ]>),
      44              :     #[cfg(test)]
      45              :     IoBufferMut(IoBufferMut),
      46              : }
      47              : 
      48              : impl From<PageReadGuard<'static>> for BlockLease<'static> {
      49      3384956 :     fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
      50      3384956 :         BlockLease::PageReadGuard(value)
      51      3384956 :     }
      52              : }
      53              : 
      54              : #[cfg(test)]
      55              : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
      56      6100260 :     fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
      57      6100260 :         BlockLease::Arc(value)
      58      6100260 :     }
      59              : }
      60              : 
      61              : impl Deref for BlockLease<'_> {
      62              :     type Target = [u8; PAGE_SZ];
      63              : 
      64      9591572 :     fn deref(&self) -> &Self::Target {
      65      9591572 :         match self {
      66      3385436 :             BlockLease::PageReadGuard(v) => v.deref(),
      67            0 :             BlockLease::EphemeralFileMutableTail(v) => v,
      68            0 :             BlockLease::Slice(v) => v,
      69              :             #[cfg(test)]
      70      6100260 :             BlockLease::Arc(v) => v.deref(),
      71              :             #[cfg(test)]
      72       105876 :             BlockLease::IoBufferMut(v) => {
      73       105876 :                 TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
      74              :             }
      75              :         }
      76      9591572 :     }
      77              : }
      78              : 
      79              : /// Provides the ability to read blocks from different sources,
      80              : /// similar to using traits for this purpose.
      81              : ///
      82              : /// Unlike traits, we also support the read function to be async though.
      83              : pub(crate) enum BlockReaderRef<'a> {
      84              :     FileBlockReader(&'a FileBlockReader<'a>),
      85              :     Adapter(Adapter<&'a DeltaLayerInner>),
      86              :     #[cfg(test)]
      87              :     TestDisk(&'a super::disk_btree::tests::TestDisk),
      88              :     #[cfg(test)]
      89              :     VirtualFile(&'a VirtualFile),
      90              : }
      91              : 
      92              : impl BlockReaderRef<'_> {
      93              :     #[inline(always)]
      94      9540452 :     async fn read_blk(
      95      9540452 :         &self,
      96      9540452 :         blknum: u32,
      97      9540452 :         ctx: &RequestContext,
      98      9540452 :     ) -> Result<BlockLease, std::io::Error> {
      99              :         use BlockReaderRef::*;
     100      9540452 :         match self {
     101      3377396 :             FileBlockReader(r) => r.read_blk(blknum, ctx).await,
     102           48 :             Adapter(r) => r.read_blk(blknum, ctx).await,
     103              :             #[cfg(test)]
     104      6100260 :             TestDisk(r) => r.read_blk(blknum),
     105              :             #[cfg(test)]
     106        62748 :             VirtualFile(r) => r.read_blk(blknum, ctx).await,
     107              :         }
     108      9540452 :     }
     109              : }
     110              : 
     111              : ///
     112              : /// A "cursor" for efficiently reading multiple pages from a BlockReader
     113              : ///
     114              : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
     115              : /// that in many cases you can use a BlockCursor as a drop-in replacement for
     116              : /// the underlying BlockReader. For example:
     117              : ///
     118              : /// ```no_run
     119              : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
     120              : /// # use pageserver::context::RequestContext;
     121              : /// # let reader: FileBlockReader = unimplemented!("stub");
     122              : /// # let ctx: RequestContext = unimplemented!("stub");
     123              : /// let cursor = reader.block_cursor();
     124              : /// let buf = cursor.read_blk(1, &ctx);
     125              : /// // do stuff with 'buf'
     126              : /// let buf = cursor.read_blk(2, &ctx);
     127              : /// // do stuff with 'buf'
     128              : /// ```
     129              : ///
     130              : pub struct BlockCursor<'a> {
     131              :     pub(super) read_compressed: bool,
     132              :     reader: BlockReaderRef<'a>,
     133              : }
     134              : 
     135              : impl<'a> BlockCursor<'a> {
     136      2467209 :     pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
     137      2467209 :         Self::new_with_compression(reader, false)
     138      2467209 :     }
     139      4193572 :     pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
     140      4193572 :         BlockCursor {
     141      4193572 :             read_compressed,
     142      4193572 :             reader,
     143      4193572 :         }
     144      4193572 :     }
     145              :     // Needed by cli
     146            0 :     pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
     147            0 :         BlockCursor {
     148            0 :             read_compressed: false,
     149            0 :             reader: BlockReaderRef::FileBlockReader(reader),
     150            0 :         }
     151            0 :     }
     152              : 
     153              :     /// Read a block.
     154              :     ///
     155              :     /// Returns a "lease" object that can be used to
     156              :     /// access to the contents of the page. (For the page cache, the
     157              :     /// lease object represents a lock on the buffer.)
     158              :     #[inline(always)]
     159      9540452 :     pub async fn read_blk(
     160      9540452 :         &self,
     161      9540452 :         blknum: u32,
     162      9540452 :         ctx: &RequestContext,
     163      9540452 :     ) -> Result<BlockLease, std::io::Error> {
     164      9540452 :         self.reader.read_blk(blknum, ctx).await
     165      9540452 :     }
     166              : }
     167              : 
     168              : /// An adapter for reading a (virtual) file using the page cache.
     169              : ///
     170              : /// The file is assumed to be immutable. This doesn't provide any functions
     171              : /// for modifying the file, nor for invalidating the cache if it is modified.
     172              : #[derive(Clone)]
     173              : pub struct FileBlockReader<'a> {
     174              :     pub file: &'a VirtualFile,
     175              : 
     176              :     /// Unique ID of this file, used as key in the page cache.
     177              :     file_id: page_cache::FileId,
     178              : 
     179              :     compressed_reads: bool,
     180              : }
     181              : 
     182              : impl<'a> FileBlockReader<'a> {
     183      1615147 :     pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
     184      1615147 :         FileBlockReader {
     185      1615147 :             file_id,
     186      1615147 :             file,
     187      1615147 :             compressed_reads: true,
     188      1615147 :         }
     189      1615147 :     }
     190              : 
     191              :     /// Read a page from the underlying file into given buffer.
     192       185181 :     async fn fill_buffer(
     193       185181 :         &self,
     194       185181 :         buf: PageWriteGuard<'static>,
     195       185181 :         blkno: u32,
     196       185181 :         ctx: &RequestContext,
     197       185181 :     ) -> Result<PageWriteGuard<'static>, std::io::Error> {
     198       185181 :         assert!(buf.len() == PAGE_SZ);
     199       185181 :         self.file
     200       185181 :             .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
     201       185181 :             .await
     202       185181 :     }
     203              :     /// Read a block.
     204              :     ///
     205              :     /// Returns a "lease" object that can be used to
     206              :     /// access to the contents of the page. (For the page cache, the
     207              :     /// lease object represents a lock on the buffer.)
     208      3384956 :     pub async fn read_blk<'b>(
     209      3384956 :         &self,
     210      3384956 :         blknum: u32,
     211      3384956 :         ctx: &RequestContext,
     212      3384956 :     ) -> Result<BlockLease<'b>, std::io::Error> {
     213      3384956 :         let cache = page_cache::get();
     214      3384956 :         match cache
     215      3384956 :             .read_immutable_buf(self.file_id, blknum, ctx)
     216      3384956 :             .await
     217      3384956 :             .map_err(|e| std::io::Error::other(format!("Failed to read immutable buf: {e:#}")))?
     218              :         {
     219      3199775 :             ReadBufResult::Found(guard) => Ok(guard.into()),
     220       185181 :             ReadBufResult::NotFound(write_guard) => {
     221              :                 // Read the page from disk into the buffer
     222       185181 :                 let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
     223       185181 :                 Ok(write_guard.mark_valid().into())
     224              :             }
     225              :         }
     226      3384956 :     }
     227              : }
     228              : 
     229              : impl BlockReader for FileBlockReader<'_> {
     230      1726267 :     fn block_cursor(&self) -> BlockCursor<'_> {
     231      1726267 :         BlockCursor::new_with_compression(
     232      1726267 :             BlockReaderRef::FileBlockReader(self),
     233      1726267 :             self.compressed_reads,
     234      1726267 :         )
     235      1726267 :     }
     236              : }
     237              : 
     238              : ///
     239              : /// Trait for block-oriented output
     240              : ///
     241              : pub trait BlockWriter {
     242              :     ///
     243              :     /// Write a page to the underlying storage.
     244              :     ///
     245              :     /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
     246              :     /// written to.
     247              :     ///
     248              :     fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
     249              : }
     250              : 
     251              : ///
     252              : /// A simple in-memory buffer of blocks.
     253              : ///
     254              : pub struct BlockBuf {
     255              :     pub blocks: Vec<IoBuffer>,
     256              : }
     257              : impl BlockWriter for BlockBuf {
     258        84683 :     fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
     259        84683 :         assert!(buf.len() == PAGE_SZ);
     260        84683 :         let blknum = self.blocks.len();
     261        84683 :         self.blocks.push(buf);
     262        84683 :         Ok(blknum as u32)
     263        84683 :     }
     264              : }
     265              : 
     266              : impl BlockBuf {
     267        12600 :     pub fn new() -> Self {
     268        12600 :         BlockBuf { blocks: Vec::new() }
     269        12600 :     }
     270              : 
     271     12285996 :     pub fn size(&self) -> u64 {
     272     12285996 :         (self.blocks.len() * PAGE_SZ) as u64
     273     12285996 :     }
     274              : }
     275              : impl Default for BlockBuf {
     276            0 :     fn default() -> Self {
     277            0 :         Self::new()
     278            0 :     }
     279              : }
        

Generated by: LCOV version 2.1-beta