LCOV - code coverage report
Current view: top level - pageserver/src/tenant/ephemeral_file - page_caching.rs (source / functions) Coverage Total Hit
Test: 86c536b7fe84b2afe03c3bb264199e9c319ae0f8.info Lines: 78.9 % 147 116
Test Date: 2024-06-24 16:38:41 Functions: 92.3 % 13 12

            Line data    Source code
       1              : //! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
       2              : //! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
       3              : 
       4              : use crate::context::RequestContext;
       5              : use crate::page_cache::{self, PAGE_SZ};
       6              : use crate::tenant::block_io::BlockLease;
       7              : use crate::virtual_file::VirtualFile;
       8              : 
       9              : use once_cell::sync::Lazy;
      10              : use std::io::{self, ErrorKind};
      11              : use tokio_epoll_uring::BoundedBuf;
      12              : use tracing::*;
      13              : 
      14              : use super::zero_padded_read_write;
      15              : 
      16              : /// See module-level comment.
      17              : pub struct RW {
      18              :     page_cache_file_id: page_cache::FileId,
      19              :     rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
      20              : }
      21              : 
      22              : impl RW {
      23         1243 :     pub fn new(file: VirtualFile) -> Self {
      24         1243 :         let page_cache_file_id = page_cache::next_file_id();
      25         1243 :         Self {
      26         1243 :             page_cache_file_id,
      27         1243 :             rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
      28         1243 :                 page_cache_file_id,
      29         1243 :                 file,
      30         1243 :             )),
      31         1243 :         }
      32         1243 :     }
      33              : 
      34         1241 :     pub fn page_cache_file_id(&self) -> page_cache::FileId {
      35         1241 :         self.page_cache_file_id
      36         1241 :     }
      37              : 
      38     10221264 :     pub(crate) async fn write_all_borrowed(
      39     10221264 :         &mut self,
      40     10221264 :         srcbuf: &[u8],
      41     10221264 :         ctx: &RequestContext,
      42     10221264 :     ) -> Result<usize, io::Error> {
      43     10221264 :         // It doesn't make sense to proactively fill the page cache on the Pageserver write path
      44     10221264 :         // because Compute is unlikely to access recently written data.
      45     10221264 :         self.rw.write_all_borrowed(srcbuf, ctx).await
      46     10221264 :     }
      47              : 
      48     10202299 :     pub(crate) fn bytes_written(&self) -> u64 {
      49     10202299 :         self.rw.bytes_written()
      50     10202299 :     }
      51              : 
      52      4956175 :     pub(crate) async fn read_blk(
      53      4956175 :         &self,
      54      4956175 :         blknum: u32,
      55      4956175 :         ctx: &RequestContext,
      56      4956175 :     ) -> Result<BlockLease, io::Error> {
      57      4956175 :         match self.rw.read_blk(blknum).await? {
      58      4302702 :             zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
      59      4302702 :                 let cache = page_cache::get();
      60      4302702 :                 match cache
      61      4302702 :                     .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
      62        53468 :                     .await
      63      4302702 :                     .map_err(|e| {
      64            0 :                         std::io::Error::new(
      65            0 :                             std::io::ErrorKind::Other,
      66            0 :                             // order path before error because error is anyhow::Error => might have many contexts
      67            0 :                             format!(
      68            0 :                                 "ephemeral file: read immutable page #{}: {}: {:#}",
      69            0 :                                 blknum,
      70            0 :                                 self.rw.as_writer().file.path,
      71            0 :                                 e,
      72            0 :                             ),
      73            0 :                         )
      74      4302702 :                     })? {
      75      4248768 :                     page_cache::ReadBufResult::Found(guard) => {
      76      4248768 :                         return Ok(BlockLease::PageReadGuard(guard))
      77              :                     }
      78        53934 :                     page_cache::ReadBufResult::NotFound(write_guard) => {
      79        53934 :                         let write_guard = writer
      80        53934 :                             .file
      81        53934 :                             .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
      82        28156 :                             .await?;
      83        53934 :                         let read_guard = write_guard.mark_valid();
      84        53934 :                         return Ok(BlockLease::PageReadGuard(read_guard));
      85              :                     }
      86              :                 }
      87              :             }
      88       653473 :             zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
      89       653473 :                 Ok(BlockLease::EphemeralFileMutableTail(buffer))
      90              :             }
      91              :         }
      92      4956175 :     }
      93              : }
      94              : 
      95              : impl Drop for RW {
      96         1115 :     fn drop(&mut self) {
      97         1115 :         // There might still be pages in the [`crate::page_cache`] for this file.
      98         1115 :         // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
      99         1115 : 
     100         1115 :         // unlink the file
     101         1115 :         let res = std::fs::remove_file(&self.rw.as_writer().file.path);
     102         1115 :         if let Err(e) = res {
     103            3 :             if e.kind() != std::io::ErrorKind::NotFound {
     104              :                 // just never log the not found errors, we cannot do anything for them; on detach
     105              :                 // the tenant directory is already gone.
     106              :                 //
     107              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
     108            0 :                 error!(
     109            0 :                     "could not remove ephemeral file '{}': {}",
     110            0 :                     self.rw.as_writer().file.path,
     111              :                     e
     112              :                 );
     113            3 :             }
     114         1112 :         }
     115         1115 :     }
     116              : }
     117              : 
     118              : struct PreWarmingWriter {
     119              :     nwritten_blocks: u32,
     120              :     page_cache_file_id: page_cache::FileId,
     121              :     file: VirtualFile,
     122              : }
     123              : 
     124              : impl PreWarmingWriter {
     125         1243 :     fn new(page_cache_file_id: page_cache::FileId, file: VirtualFile) -> Self {
     126         1243 :         Self {
     127         1243 :             nwritten_blocks: 0,
     128         1243 :             page_cache_file_id,
     129         1243 :             file,
     130         1243 :         }
     131         1243 :     }
     132              : }
     133              : 
     134              : impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
     135         6610 :     async fn write_all<
     136         6610 :         B: tokio_epoll_uring::BoundedBuf<Buf = Buf>,
     137         6610 :         Buf: tokio_epoll_uring::IoBuf + Send,
     138         6610 :     >(
     139         6610 :         &mut self,
     140         6610 :         buf: B,
     141         6610 :         ctx: &RequestContext,
     142         6610 :     ) -> std::io::Result<(usize, B::Buf)> {
     143         6610 :         let buf = buf.slice(..);
     144         6610 :         let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done
     145         6610 :         let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) {
     146         6610 :             Some(buf.to_vec())
     147              :         } else {
     148            0 :             None
     149              :         };
     150         6610 :         let buflen = buf.len();
     151         6610 :         assert_eq!(
     152         6610 :             buflen % PAGE_SZ,
     153              :             0,
     154            0 :             "{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
     155              :         );
     156              : 
     157              :         // Do the IO.
     158         6610 :         let iobuf = match self.file.write_all(buf, ctx).await {
     159         6610 :             (iobuf, Ok(nwritten)) => {
     160         6610 :                 assert_eq!(nwritten, buflen);
     161         6610 :                 iobuf
     162              :             }
     163            0 :             (_, Err(e)) => {
     164            0 :                 return Err(std::io::Error::new(
     165            0 :                     ErrorKind::Other,
     166            0 :                     // order error before path because path is long and error is short
     167            0 :                     format!(
     168            0 :                         "ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
     169            0 :                         self.nwritten_blocks, buflen, e, self.file.path,
     170            0 :                     ),
     171            0 :                 ));
     172              :             }
     173              :         };
     174              : 
     175              :         // Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf)
     176         6610 :         let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds);
     177         6610 :         if let Some(check_bounds_stuff_works) = check_bounds_stuff_works {
     178         6610 :             assert_eq!(&check_bounds_stuff_works, &*buf);
     179            0 :         }
     180              : 
     181              :         // Pre-warm page cache with the contents.
     182              :         // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
     183              :         // benefits the code that writes InMemoryLayer=>L0 layers.
     184         6610 :         let nblocks = buflen / PAGE_SZ;
     185         6610 :         let nblocks32 = u32::try_from(nblocks).unwrap();
     186         6610 :         let cache = page_cache::get();
     187           28 :         static CTX: Lazy<RequestContext> = Lazy::new(|| {
     188           28 :             RequestContext::new(
     189           28 :                 crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
     190           28 :                 crate::context::DownloadBehavior::Error,
     191           28 :             )
     192           28 :         });
     193        52880 :         for blknum_in_buffer in 0..nblocks {
     194        52880 :             let blk_in_buffer = &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
     195        52880 :             let blknum = self
     196        52880 :                 .nwritten_blocks
     197        52880 :                 .checked_add(blknum_in_buffer as u32)
     198        52880 :                 .unwrap();
     199        52880 :             match cache
     200        52880 :                 .read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
     201          257 :                 .await
     202              :             {
     203            0 :                 Err(e) => {
     204            0 :                     error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
     205              :                     // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
     206              :                 }
     207        52880 :                 Ok(v) => match v {
     208            0 :                     page_cache::ReadBufResult::Found(_guard) => {
     209            0 :                         // This function takes &mut self, so, it shouldn't be possible to reach this point.
     210            0 :                         unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
     211            0 :                                       and this function takes &mut self, so, no concurrent read_blk is possible");
     212              :                     }
     213        52880 :                     page_cache::ReadBufResult::NotFound(mut write_guard) => {
     214        52880 :                         write_guard.copy_from_slice(blk_in_buffer);
     215        52880 :                         let _ = write_guard.mark_valid();
     216        52880 :                     }
     217              :                 },
     218              :             }
     219              :         }
     220         6610 :         self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
     221         6610 :         Ok((buflen, buf.into_inner()))
     222         6610 :     }
     223              : }
        

Generated by: LCOV version 2.1-beta