|             Line data    Source code 
       1              : //!
       2              : //! Low-level Block-oriented I/O functions
       3              : //!
       4              : 
       5              : use std::ops::Deref;
       6              : 
       7              : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
       8              : use crate::context::RequestContext;
       9              : use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
      10              : #[cfg(test)]
      11              : use crate::virtual_file::IoBufferMut;
      12              : use crate::virtual_file::{IoBuffer, VirtualFile};
      13              : 
      14              : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
      15              : /// blocks, using the page cache
      16              : ///
      17              : /// There are currently two implementations: EphemeralFile, and FileBlockReader
      18              : /// below.
      19              : pub trait BlockReader {
      20              :     ///
      21              :     /// Create a new "cursor" for reading from this reader.
      22              :     ///
      23              :     /// A cursor caches the last accessed page, allowing for faster
      24              :     /// access if the same block is accessed repeatedly.
      25              :     fn block_cursor(&self) -> BlockCursor<'_>;
      26              : }
      27              : 
      28              : impl<B> BlockReader for &B
      29              : where
      30              :     B: BlockReader,
      31              : {
      32           10 :     fn block_cursor(&self) -> BlockCursor<'_> {
      33           10 :         (*self).block_cursor()
      34           10 :     }
      35              : }
      36              : 
      37              : /// Reference to an in-memory copy of an immutable on-disk block.
      38              : pub enum BlockLease<'a> {
      39              :     PageReadGuard(PageReadGuard<'static>),
      40              :     EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
      41              :     Slice(&'a [u8; PAGE_SZ]),
      42              :     #[cfg(test)]
      43              :     Arc(std::sync::Arc<[u8; PAGE_SZ]>),
      44              :     #[cfg(test)]
      45              :     IoBufferMut(IoBufferMut),
      46              : }
      47              : 
      48              : impl From<PageReadGuard<'static>> for BlockLease<'static> {
      49       296965 :     fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
      50       296965 :         BlockLease::PageReadGuard(value)
      51       296965 :     }
      52              : }
      53              : 
      54              : #[cfg(test)]
      55              : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
      56       508346 :     fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
      57       508346 :         BlockLease::Arc(value)
      58       508346 :     }
      59              : }
      60              : 
      61              : impl Deref for BlockLease<'_> {
      62              :     type Target = [u8; PAGE_SZ];
      63              : 
      64       814174 :     fn deref(&self) -> &Self::Target {
      65       814174 :         match self {
      66       297005 :             BlockLease::PageReadGuard(v) => v.deref(),
      67            0 :             BlockLease::EphemeralFileMutableTail(v) => v,
      68            0 :             BlockLease::Slice(v) => v,
      69              :             #[cfg(test)]
      70       508346 :             BlockLease::Arc(v) => v.deref(),
      71              :             #[cfg(test)]
      72         8823 :             BlockLease::IoBufferMut(v) => {
      73         8823 :                 TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
      74              :             }
      75              :         }
      76       814174 :     }
      77              : }
      78              : 
      79              : /// Provides the ability to read blocks from different sources,
      80              : /// similar to using traits for this purpose.
      81              : ///
      82              : /// Unlike traits, we also support the read function to be async though.
      83              : pub(crate) enum BlockReaderRef<'a> {
      84              :     FileBlockReader(&'a FileBlockReader<'a>),
      85              :     Adapter(Adapter<&'a DeltaLayerInner>),
      86              :     #[cfg(test)]
      87              :     TestDisk(&'a super::disk_btree::tests::TestDisk),
      88              :     #[cfg(test)]
      89              :     VirtualFile(&'a VirtualFile),
      90              : }
      91              : 
      92              : impl BlockReaderRef<'_> {
      93              :     #[inline(always)]
      94       809905 :     async fn read_blk(
      95       809905 :         &self,
      96       809905 :         blknum: u32,
      97       809905 :         ctx: &RequestContext,
      98       809905 :     ) -> Result<BlockLease, std::io::Error> {
      99              :         use BlockReaderRef::*;
     100       809905 :         match self {
     101       296326 :             FileBlockReader(r) => r.read_blk(blknum, ctx).await,
     102            4 :             Adapter(r) => r.read_blk(blknum, ctx).await,
     103              :             #[cfg(test)]
     104       508346 :             TestDisk(r) => r.read_blk(blknum),
     105              :             #[cfg(test)]
     106         5229 :             VirtualFile(r) => r.read_blk(blknum, ctx).await,
     107              :         }
     108       809905 :     }
     109              : }
     110              : 
     111              : ///
     112              : /// A "cursor" for efficiently reading multiple pages from a BlockReader
     113              : ///
     114              : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
     115              : /// that in many cases you can use a BlockCursor as a drop-in replacement for
     116              : /// the underlying BlockReader. For example:
     117              : ///
     118              : /// ```no_run
     119              : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
     120              : /// # use pageserver::context::RequestContext;
     121              : /// # let reader: FileBlockReader = unimplemented!("stub");
     122              : /// # let ctx: RequestContext = unimplemented!("stub");
     123              : /// let cursor = reader.block_cursor();
     124              : /// let buf = cursor.read_blk(1, &ctx);
     125              : /// // do stuff with 'buf'
     126              : /// let buf = cursor.read_blk(2, &ctx);
     127              : /// // do stuff with 'buf'
     128              : /// ```
     129              : ///
     130              : pub struct BlockCursor<'a> {
     131              :     pub(super) read_compressed: bool,
     132              :     reader: BlockReaderRef<'a>,
     133              : }
     134              : 
     135              : impl<'a> BlockCursor<'a> {
     136       205596 :     pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
     137       205596 :         Self::new_with_compression(reader, false)
     138       205596 :     }
     139       355740 :     pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
     140       355740 :         BlockCursor {
     141       355740 :             read_compressed,
     142       355740 :             reader,
     143       355740 :         }
     144       355740 :     }
     145              :     // Needed by cli
     146            0 :     pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
     147            0 :         BlockCursor {
     148            0 :             read_compressed: false,
     149            0 :             reader: BlockReaderRef::FileBlockReader(reader),
     150            0 :         }
     151            0 :     }
     152              : 
     153              :     /// Read a block.
     154              :     ///
     155              :     /// Returns a "lease" object that can be used to
     156              :     /// access to the contents of the page. (For the page cache, the
     157              :     /// lease object represents a lock on the buffer.)
     158              :     #[inline(always)]
     159       809905 :     pub async fn read_blk(
     160       809905 :         &self,
     161       809905 :         blknum: u32,
     162       809905 :         ctx: &RequestContext,
     163       809905 :     ) -> Result<BlockLease, std::io::Error> {
     164       809905 :         self.reader.read_blk(blknum, ctx).await
     165       809905 :     }
     166              : }
     167              : 
     168              : /// An adapter for reading a (virtual) file using the page cache.
     169              : ///
     170              : /// The file is assumed to be immutable. This doesn't provide any functions
     171              : /// for modifying the file, nor for invalidating the cache if it is modified.
     172              : #[derive(Clone)]
     173              : pub struct FileBlockReader<'a> {
     174              :     pub file: &'a VirtualFile,
     175              : 
     176              :     /// Unique ID of this file, used as key in the page cache.
     177              :     file_id: page_cache::FileId,
     178              : 
     179              :     compressed_reads: bool,
     180              : }
     181              : 
     182              : impl<'a> FileBlockReader<'a> {
     183       141089 :     pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
     184       141089 :         FileBlockReader {
     185       141089 :             file_id,
     186       141089 :             file,
     187       141089 :             compressed_reads: true,
     188       141089 :         }
     189       141089 :     }
     190              : 
     191              :     /// Read a page from the underlying file into given buffer.
     192        17369 :     async fn fill_buffer(
     193        17369 :         &self,
     194        17369 :         buf: PageWriteGuard<'static>,
     195        17369 :         blkno: u32,
     196        17369 :         ctx: &RequestContext,
     197        17369 :     ) -> Result<PageWriteGuard<'static>, std::io::Error> {
     198        17369 :         assert!(buf.len() == PAGE_SZ);
     199        17369 :         self.file
     200        17369 :             .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
     201        17369 :             .await
     202        17369 :     }
     203              :     /// Read a block.
     204              :     ///
     205              :     /// Returns a "lease" object that can be used to
     206              :     /// access to the contents of the page. (For the page cache, the
     207              :     /// lease object represents a lock on the buffer.)
     208       296965 :     pub async fn read_blk<'b>(
     209       296965 :         &self,
     210       296965 :         blknum: u32,
     211       296965 :         ctx: &RequestContext,
     212       296965 :     ) -> Result<BlockLease<'b>, std::io::Error> {
     213       296965 :         let cache = page_cache::get();
     214       296965 :         match cache
     215       296965 :             .read_immutable_buf(self.file_id, blknum, ctx)
     216       296965 :             .await
     217       296965 :             .map_err(|e| std::io::Error::other(format!("Failed to read immutable buf: {e:#}")))?
     218              :         {
     219       279596 :             ReadBufResult::Found(guard) => Ok(guard.into()),
     220        17369 :             ReadBufResult::NotFound(write_guard) => {
     221              :                 // Read the page from disk into the buffer
     222        17369 :                 let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
     223        17369 :                 Ok(write_guard.mark_valid().into())
     224              :             }
     225              :         }
     226       296965 :     }
     227              : }
     228              : 
     229              : impl BlockReader for FileBlockReader<'_> {
     230       150136 :     fn block_cursor(&self) -> BlockCursor<'_> {
     231       150136 :         BlockCursor::new_with_compression(
     232       150136 :             BlockReaderRef::FileBlockReader(self),
     233       150136 :             self.compressed_reads,
     234              :         )
     235       150136 :     }
     236              : }
     237              : 
     238              : ///
     239              : /// Trait for block-oriented output
     240              : ///
     241              : pub trait BlockWriter {
     242              :     ///
     243              :     /// Write a page to the underlying storage.
     244              :     ///
     245              :     /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
     246              :     /// written to.
     247              :     ///
     248              :     fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
     249              : }
     250              : 
     251              : ///
     252              : /// A simple in-memory buffer of blocks.
     253              : ///
     254              : pub struct BlockBuf {
     255              :     pub blocks: Vec<IoBuffer>,
     256              : }
     257              : impl BlockWriter for BlockBuf {
     258         7076 :     fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
     259         7076 :         assert!(buf.len() == PAGE_SZ);
     260         7076 :         let blknum = self.blocks.len();
     261         7076 :         self.blocks.push(buf);
     262         7076 :         Ok(blknum as u32)
     263         7076 :     }
     264              : }
     265              : 
     266              : impl BlockBuf {
     267         1117 :     pub fn new() -> Self {
     268         1117 :         BlockBuf { blocks: Vec::new() }
     269         1117 :     }
     270              : 
     271      1029483 :     pub fn size(&self) -> u64 {
     272      1029483 :         (self.blocks.len() * PAGE_SZ) as u64
     273      1029483 :     }
     274              : }
     275              : impl Default for BlockBuf {
     276            0 :     fn default() -> Self {
     277            0 :         Self::new()
     278            0 :     }
     279              : }
         |