LCOV - code coverage report
Current view: top level - pageserver/src/tenant/ephemeral_file - page_caching.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 63.6 % 195 124
Test Date: 2024-07-03 15:33:13 Functions: 70.6 % 17 12

            Line data    Source code
       1              : //! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
       2              : //! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
       3              : 
       4              : use crate::context::RequestContext;
       5              : use crate::page_cache::{self, PAGE_SZ};
       6              : use crate::tenant::block_io::BlockLease;
       7              : use crate::virtual_file::VirtualFile;
       8              : 
       9              : use once_cell::sync::Lazy;
      10              : use std::io::{self, ErrorKind};
      11              : use std::ops::{Deref, Range};
      12              : use tokio_epoll_uring::BoundedBuf;
      13              : use tracing::*;
      14              : 
      15              : use super::zero_padded_read_write;
      16              : 
      17              : /// See module-level comment.
      18              : pub struct RW {
      19              :     page_cache_file_id: page_cache::FileId,
      20              :     rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
      21              : }
      22              : 
      23              : /// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
      24              : /// should we pre-warm the [`crate::page_cache`] with the contents?
      25              : #[derive(Clone, Copy)]
      26              : pub enum PrewarmOnWrite {
      27              :     Yes,
      28              :     No,
      29              : }
      30              : 
      31              : impl RW {
      32         1246 :     pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self {
      33         1246 :         let page_cache_file_id = page_cache::next_file_id();
      34         1246 :         Self {
      35         1246 :             page_cache_file_id,
      36         1246 :             rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
      37         1246 :                 page_cache_file_id,
      38         1246 :                 file,
      39         1246 :                 prewarm_on_write,
      40         1246 :             )),
      41         1246 :         }
      42         1246 :     }
      43              : 
      44         1244 :     pub fn page_cache_file_id(&self) -> page_cache::FileId {
      45         1244 :         self.page_cache_file_id
      46         1244 :     }
      47              : 
      48     10221312 :     pub(crate) async fn write_all_borrowed(
      49     10221312 :         &mut self,
      50     10221312 :         srcbuf: &[u8],
      51     10221312 :         ctx: &RequestContext,
      52     10221312 :     ) -> Result<usize, io::Error> {
      53     10221312 :         // It doesn't make sense to proactively fill the page cache on the Pageserver write path
      54     10221312 :         // because Compute is unlikely to access recently written data.
      55     10221312 :         self.rw.write_all_borrowed(srcbuf, ctx).await
      56     10221312 :     }
      57              : 
      58     10202350 :     pub(crate) fn bytes_written(&self) -> u64 {
      59     10202350 :         self.rw.bytes_written()
      60     10202350 :     }
      61              : 
      62              :     /// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer.
      63              :     ///
      64              :     /// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
      65              :     /// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
      66            0 :     pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
      67              :         // round up to the next PAGE_SZ multiple, required by blob_io
      68            0 :         let size = {
      69            0 :             let s = usize::try_from(self.bytes_written()).unwrap();
      70            0 :             if s % PAGE_SZ == 0 {
      71            0 :                 s
      72              :             } else {
      73            0 :                 s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap()
      74              :             }
      75              :         };
      76            0 :         let vec = Vec::with_capacity(size);
      77            0 : 
      78            0 :         // read from disk what we've already flushed
      79            0 :         let writer = self.rw.as_writer();
      80            0 :         let flushed_range = writer.written_range();
      81            0 :         let mut vec = writer
      82            0 :             .file
      83            0 :             .read_exact_at(
      84            0 :                 vec.slice(0..(flushed_range.end - flushed_range.start)),
      85            0 :                 u64::try_from(flushed_range.start).unwrap(),
      86            0 :                 ctx,
      87            0 :             )
      88            0 :             .await?
      89            0 :             .into_inner();
      90            0 : 
      91            0 :         // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
      92            0 :         let buffered = self.rw.get_tail_zero_padded();
      93            0 :         vec.extend_from_slice(buffered);
      94            0 :         assert_eq!(vec.len(), size);
      95            0 :         assert_eq!(vec.len() % PAGE_SZ, 0);
      96            0 :         Ok(vec)
      97            0 :     }
      98              : 
      99      4955837 :     pub(crate) async fn read_blk(
     100      4955837 :         &self,
     101      4955837 :         blknum: u32,
     102      4955837 :         ctx: &RequestContext,
     103      4955837 :     ) -> Result<BlockLease, io::Error> {
     104      4955837 :         match self.rw.read_blk(blknum).await? {
     105      4302492 :             zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
     106      4302492 :                 let cache = page_cache::get();
     107      4302492 :                 match cache
     108      4302492 :                     .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
     109        53438 :                     .await
     110      4302492 :                     .map_err(|e| {
     111            0 :                         std::io::Error::new(
     112            0 :                             std::io::ErrorKind::Other,
     113            0 :                             // order path before error because error is anyhow::Error => might have many contexts
     114            0 :                             format!(
     115            0 :                                 "ephemeral file: read immutable page #{}: {}: {:#}",
     116            0 :                                 blknum,
     117            0 :                                 self.rw.as_writer().file.path,
     118            0 :                                 e,
     119            0 :                             ),
     120            0 :                         )
     121      4302492 :                     })? {
     122      4248543 :                     page_cache::ReadBufResult::Found(guard) => {
     123      4248543 :                         return Ok(BlockLease::PageReadGuard(guard))
     124              :                     }
     125        53949 :                     page_cache::ReadBufResult::NotFound(write_guard) => {
     126        53949 :                         let write_guard = writer
     127        53949 :                             .file
     128        53949 :                             .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
     129        28159 :                             .await?;
     130        53949 :                         let read_guard = write_guard.mark_valid();
     131        53949 :                         return Ok(BlockLease::PageReadGuard(read_guard));
     132              :                     }
     133              :                 }
     134              :             }
     135       653345 :             zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
     136       653345 :                 Ok(BlockLease::EphemeralFileMutableTail(buffer))
     137              :             }
     138              :         }
     139      4955837 :     }
     140              : }
     141              : 
     142              : impl Drop for RW {
     143         1118 :     fn drop(&mut self) {
     144         1118 :         // There might still be pages in the [`crate::page_cache`] for this file.
     145         1118 :         // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
     146         1118 : 
     147         1118 :         // unlink the file
     148         1118 :         let res = std::fs::remove_file(&self.rw.as_writer().file.path);
     149         1118 :         if let Err(e) = res {
     150            2 :             if e.kind() != std::io::ErrorKind::NotFound {
     151              :                 // just never log the not found errors, we cannot do anything for them; on detach
     152              :                 // the tenant directory is already gone.
     153              :                 //
     154              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
     155            0 :                 error!(
     156            0 :                     "could not remove ephemeral file '{}': {}",
     157            0 :                     self.rw.as_writer().file.path,
     158              :                     e
     159              :                 );
     160            2 :             }
     161         1116 :         }
     162         1118 :     }
     163              : }
     164              : 
     165              : struct PreWarmingWriter {
     166              :     prewarm_on_write: PrewarmOnWrite,
     167              :     nwritten_blocks: u32,
     168              :     page_cache_file_id: page_cache::FileId,
     169              :     file: VirtualFile,
     170              : }
     171              : 
     172              : impl PreWarmingWriter {
     173         1246 :     fn new(
     174         1246 :         page_cache_file_id: page_cache::FileId,
     175         1246 :         file: VirtualFile,
     176         1246 :         prewarm_on_write: PrewarmOnWrite,
     177         1246 :     ) -> Self {
     178         1246 :         Self {
     179         1246 :             prewarm_on_write,
     180         1246 :             nwritten_blocks: 0,
     181         1246 :             page_cache_file_id,
     182         1246 :             file,
     183         1246 :         }
     184         1246 :     }
     185              : 
     186              :     /// Return the byte range within `file` that has been written though `write_all`.
     187              :     ///
     188              :     /// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
     189            0 :     fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
     190            0 :         let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
     191            0 :         struct Wrapper(Range<usize>);
     192            0 :         impl Deref for Wrapper {
     193            0 :             type Target = Range<usize>;
     194            0 :             fn deref(&self) -> &Range<usize> {
     195            0 :                 &self.0
     196            0 :             }
     197            0 :         }
     198            0 :         Wrapper(0..nwritten_blocks * PAGE_SZ)
     199            0 :     }
     200              : }
     201              : 
     202              : impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
     203         6610 :     async fn write_all<
     204         6610 :         B: tokio_epoll_uring::BoundedBuf<Buf = Buf>,
     205         6610 :         Buf: tokio_epoll_uring::IoBuf + Send,
     206         6610 :     >(
     207         6610 :         &mut self,
     208         6610 :         buf: B,
     209         6610 :         ctx: &RequestContext,
     210         6610 :     ) -> std::io::Result<(usize, B::Buf)> {
     211         6610 :         let buf = buf.slice(..);
     212         6610 :         let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done
     213         6610 :         let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) {
     214         6610 :             Some(buf.to_vec())
     215              :         } else {
     216            0 :             None
     217              :         };
     218         6610 :         let buflen = buf.len();
     219         6610 :         assert_eq!(
     220         6610 :             buflen % PAGE_SZ,
     221              :             0,
     222            0 :             "{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
     223              :         );
     224              : 
     225              :         // Do the IO.
     226         6610 :         let iobuf = match self.file.write_all(buf, ctx).await {
     227         6610 :             (iobuf, Ok(nwritten)) => {
     228         6610 :                 assert_eq!(nwritten, buflen);
     229         6610 :                 iobuf
     230              :             }
     231            0 :             (_, Err(e)) => {
     232            0 :                 return Err(std::io::Error::new(
     233            0 :                     ErrorKind::Other,
     234            0 :                     // order error before path because path is long and error is short
     235            0 :                     format!(
     236            0 :                         "ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
     237            0 :                         self.nwritten_blocks, buflen, e, self.file.path,
     238            0 :                     ),
     239            0 :                 ));
     240              :             }
     241              :         };
     242              : 
     243              :         // Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf)
     244         6610 :         let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds);
     245         6610 :         if let Some(check_bounds_stuff_works) = check_bounds_stuff_works {
     246         6610 :             assert_eq!(&check_bounds_stuff_works, &*buf);
     247            0 :         }
     248              : 
     249         6610 :         let nblocks = buflen / PAGE_SZ;
     250         6610 :         let nblocks32 = u32::try_from(nblocks).unwrap();
     251              : 
     252         6610 :         if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
     253              :             // Pre-warm page cache with the contents.
     254              :             // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
     255              :             // benefits the code that writes InMemoryLayer=>L0 layers.
     256              : 
     257         6610 :             let cache = page_cache::get();
     258           28 :             static CTX: Lazy<RequestContext> = Lazy::new(|| {
     259           28 :                 RequestContext::new(
     260           28 :                     crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
     261           28 :                     crate::context::DownloadBehavior::Error,
     262           28 :                 )
     263           28 :             });
     264        52880 :             for blknum_in_buffer in 0..nblocks {
     265        52880 :                 let blk_in_buffer =
     266        52880 :                     &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
     267        52880 :                 let blknum = self
     268        52880 :                     .nwritten_blocks
     269        52880 :                     .checked_add(blknum_in_buffer as u32)
     270        52880 :                     .unwrap();
     271        52880 :                 match cache
     272        52880 :                     .read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
     273          261 :                     .await
     274              :                 {
     275            0 :                     Err(e) => {
     276            0 :                         error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
     277              :                         // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
     278              :                     }
     279        52880 :                     Ok(v) => match v {
     280            0 :                         page_cache::ReadBufResult::Found(_guard) => {
     281            0 :                             // This function takes &mut self, so, it shouldn't be possible to reach this point.
     282            0 :                             unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
     283            0 :                                       and this function takes &mut self, so, no concurrent read_blk is possible");
     284              :                         }
     285        52880 :                         page_cache::ReadBufResult::NotFound(mut write_guard) => {
     286        52880 :                             write_guard.copy_from_slice(blk_in_buffer);
     287        52880 :                             let _ = write_guard.mark_valid();
     288        52880 :                         }
     289              :                     },
     290              :                 }
     291              :             }
     292            0 :         }
     293              : 
     294         6610 :         self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
     295         6610 :         Ok((buflen, buf.into_inner()))
     296         6610 :     }
     297              : }
        

Generated by: LCOV version 2.1-beta