LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 88.4 % 242 214
Test Date: 2023-09-06 10:18:01 Functions: 83.3 % 18 15

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::config::PageServerConf;
       5              : use crate::page_cache::{self, PAGE_SZ};
       6              : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       7              : use crate::virtual_file::VirtualFile;
       8              : use std::cmp::min;
       9              : use std::fs::OpenOptions;
      10              : use std::io::{self, ErrorKind};
      11              : use std::ops::DerefMut;
      12              : use std::path::PathBuf;
      13              : use std::sync::atomic::AtomicU64;
      14              : use tracing::*;
      15              : use utils::id::{TenantId, TimelineId};
      16              : 
      17              : pub struct EphemeralFile {
      18              :     page_cache_file_id: page_cache::FileId,
      19              : 
      20              :     _tenant_id: TenantId,
      21              :     _timeline_id: TimelineId,
      22              :     file: VirtualFile,
      23              :     len: u64,
      24              :     /// An ephemeral file is append-only.
      25              :     /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
      26              :     /// The other pages, which can no longer be modified, are accessed through the page cache.
      27              :     mutable_tail: [u8; PAGE_SZ],
      28              : }
      29              : 
      30              : impl EphemeralFile {
      31         6869 :     pub fn create(
      32         6869 :         conf: &PageServerConf,
      33         6869 :         tenant_id: TenantId,
      34         6869 :         timeline_id: TimelineId,
      35         6869 :     ) -> Result<EphemeralFile, io::Error> {
      36         6869 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      37         6869 :         let filename_disambiguator =
      38         6869 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      39         6869 : 
      40         6869 :         let filename = conf
      41         6869 :             .timeline_path(&tenant_id, &timeline_id)
      42         6869 :             .join(PathBuf::from(format!("ephemeral-{filename_disambiguator}")));
      43              : 
      44         6869 :         let file = VirtualFile::open_with_options(
      45         6869 :             &filename,
      46         6869 :             OpenOptions::new().read(true).write(true).create(true),
      47         6869 :         )?;
      48              : 
      49         6869 :         Ok(EphemeralFile {
      50         6869 :             page_cache_file_id: page_cache::next_file_id(),
      51         6869 :             _tenant_id: tenant_id,
      52         6869 :             _timeline_id: timeline_id,
      53         6869 :             file,
      54         6869 :             len: 0,
      55         6869 :             mutable_tail: [0u8; PAGE_SZ],
      56         6869 :         })
      57         6869 :     }
      58              : 
      59       733761 :     pub(crate) fn len(&self) -> u64 {
      60       733761 :         self.len
      61       733761 :     }
      62              : 
      63     96762958 :     pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
      64     96762870 :         let flushed_blknums = 0..self.len / PAGE_SZ as u64;
      65     96762870 :         if flushed_blknums.contains(&(blknum as u64)) {
      66     94136815 :             let cache = page_cache::get();
      67              :             loop {
      68     95788974 :                 match cache
      69     95788974 :                     .read_immutable_buf(self.page_cache_file_id, blknum)
      70       592846 :                     .await
      71     95788973 :                     .map_err(|e| {
      72            0 :                         std::io::Error::new(
      73            0 :                             std::io::ErrorKind::Other,
      74            0 :                             // order path before error because error is anyhow::Error => might have many contexts
      75            0 :                             format!(
      76            0 :                                 "ephemeral file: read immutable page #{}: {}: {:#}",
      77            0 :                                 blknum,
      78            0 :                                 self.file.path.display(),
      79            0 :                                 e,
      80            0 :                             ),
      81            0 :                         )
      82     95788973 :                     })? {
      83     94136814 :                     page_cache::ReadBufResult::Found(guard) => {
      84     94136814 :                         return Ok(BlockLease::PageReadGuard(guard))
      85              :                     }
      86      1652159 :                     page_cache::ReadBufResult::NotFound(mut write_guard) => {
      87      1652159 :                         let buf: &mut [u8] = write_guard.deref_mut();
      88      1652159 :                         debug_assert_eq!(buf.len(), PAGE_SZ);
      89      1652159 :                         self.file
      90      1652159 :                             .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
      91            0 :                             .await?;
      92      1652159 :                         write_guard.mark_valid();
      93      1652159 : 
      94      1652159 :                         // Swap for read lock
      95      1652159 :                         continue;
      96              :                     }
      97              :                 };
      98              :             }
      99              :         } else {
     100      2626055 :             debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
     101      2626055 :             Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
     102              :         }
     103     96762869 :     }
     104              : 
     105     82556103 :     pub(crate) async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
     106     82556073 :         struct Writer<'a> {
     107     82556073 :             ephemeral_file: &'a mut EphemeralFile,
     108     82556073 :             /// The block to which the next [`push_bytes`] will write.
     109     82556073 :             blknum: u32,
     110     82556073 :             /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
     111     82556073 :             off: usize,
     112     82556073 :         }
     113     82556073 :         impl<'a> Writer<'a> {
     114     82556103 :             fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
     115     82556103 :                 Ok(Writer {
     116     82556103 :                     blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
     117     82556103 :                     off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
     118     82556103 :                     ephemeral_file,
     119     82556103 :                 })
     120     82556103 :             }
     121     82556073 :             #[inline(always)]
     122    165112146 :             async fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
     123    165112146 :                 let mut src_remaining = src;
     124    334152305 :                 while !src_remaining.is_empty() {
     125    169040160 :                     let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
     126    169040160 :                     let n = min(dst_remaining.len(), src_remaining.len());
     127    169040160 :                     dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
     128    169040160 :                     self.off += n;
     129    169040160 :                     src_remaining = &src_remaining[n..];
     130    169040160 :                     if self.off == PAGE_SZ {
     131     82556073 :                         match self
     132      3950823 :                             .ephemeral_file
     133      3950823 :                             .file
     134      3950823 :                             .write_all_at(
     135      3950823 :                                 &self.ephemeral_file.mutable_tail,
     136      3950823 :                                 self.blknum as u64 * PAGE_SZ as u64,
     137      3950823 :                             )
     138     82556073 :                             .await
     139     82556073 :                         {
     140     82556073 :                             Ok(_) => {
     141     82556073 :                                 // Pre-warm the page cache with what we just wrote.
     142     82556073 :                                 // This isn't necessary for coherency/correctness, but it's how we've always done it.
     143     82556073 :                                 let cache = page_cache::get();
     144      3950822 :                                 match cache
     145      3950822 :                                     .read_immutable_buf(
     146      3950822 :                                         self.ephemeral_file.page_cache_file_id,
     147      3950822 :                                         self.blknum,
     148      3950822 :                                     )
     149     82556073 :                                     .await
     150     82556073 :                                 {
     151     82556073 :                                     Ok(page_cache::ReadBufResult::Found(_guard)) => {
     152            0 :                                         // This function takes &mut self, so, it shouldn't be possible to reach this point.
     153            0 :                                         unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
     154     82556073 :                                     }
     155     82556073 :                                     Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
     156      3950822 :                                         let buf: &mut [u8] = write_guard.deref_mut();
     157     82556073 :                                         debug_assert_eq!(buf.len(), PAGE_SZ);
     158     82556073 :                                         buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
     159      3950822 :                                         write_guard.mark_valid();
     160     82556073 :                                         // pre-warm successful
     161     82556073 :                                     }
     162     82556073 :                                     Err(e) => {
     163     82556073 :                                         error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
     164     82556073 :                                         // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
     165     82556073 :                                     }
     166     82556073 :                                 }
     167     82556073 :                                 // Zero the buffer for re-use.
     168     82556073 :                                 // Zeroing is critical for correcntess because the write_blob code below
     169     82556073 :                                 // and similarly read_blk expect zeroed pages.
     170     82556073 :                                 self.ephemeral_file.mutable_tail.fill(0);
     171      3950822 :                                 // This block is done, move to next one.
     172      3950822 :                                 self.blknum += 1;
     173      3950822 :                                 self.off = 0;
     174     82556073 :                             }
     175     82556073 :                             Err(e) => {
     176            0 :                                 return Err(std::io::Error::new(
     177            0 :                                     ErrorKind::Other,
     178            0 :                                     // order error before path because path is long and error is short
     179            0 :                                     format!(
     180            0 :                                         "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
     181            0 :                                         self.blknum,
     182            0 :                                         e,
     183            0 :                                         self.ephemeral_file.file.path.display(),
     184            0 :                                     ),
     185            0 :                                 ));
     186     82556073 :                             }
     187     82556073 :                         }
     188    165089337 :                     }
     189     82556073 :                 }
     190    165112145 :                 Ok(())
     191    165112145 :             }
     192     82556073 :         }
     193     82556073 : 
     194     82556073 :         let pos = self.len;
     195     82556073 :         let mut writer = Writer::new(self)?;
     196              : 
     197              :         // Write the length field
     198     82556073 :         if srcbuf.len() < 0x80 {
     199              :             // short one-byte length header
     200     61017636 :             let len_buf = [srcbuf.len() as u8];
     201     61017636 :             writer.push_bytes(&len_buf).await?;
     202              :         } else {
     203     21538437 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
     204     21538437 :             len_buf[0] |= 0x80;
     205     21538437 :             writer.push_bytes(&len_buf).await?;
     206              :         }
     207              : 
     208              :         // Write the payload
     209     82556073 :         writer.push_bytes(srcbuf).await?;
     210              : 
     211     82556072 :         if srcbuf.len() < 0x80 {
     212     61017635 :             self.len += 1;
     213     61017635 :         } else {
     214     21538437 :             self.len += 4;
     215     21538437 :         }
     216     82556072 :         self.len += srcbuf.len() as u64;
     217     82556072 : 
     218     82556072 :         Ok(pos)
     219     82556072 :     }
     220              : }
     221              : 
     222              : /// Does the given filename look like an ephemeral file?
     223              : pub fn is_ephemeral_file(filename: &str) -> bool {
     224           88 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     225           83 :         rest.parse::<u32>().is_ok()
     226              :     } else {
     227            5 :         false
     228              :     }
     229           88 : }
     230              : 
     231              : impl Drop for EphemeralFile {
     232         6568 :     fn drop(&mut self) {
     233         6568 :         // There might still be pages in the [`crate::page_cache`] for this file.
     234         6568 :         // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
     235         6568 : 
     236         6568 :         // unlink the file
     237         6568 :         let res = std::fs::remove_file(&self.file.path);
     238         6568 :         if let Err(e) = res {
     239           57 :             if e.kind() != std::io::ErrorKind::NotFound {
     240              :                 // just never log the not found errors, we cannot do anything for them; on detach
     241              :                 // the tenant directory is already gone.
     242              :                 //
     243              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
     244            0 :                 error!(
     245            0 :                     "could not remove ephemeral file '{}': {}",
     246            0 :                     self.file.path.display(),
     247            0 :                     e
     248            0 :                 );
     249           57 :             }
     250         6511 :         }
     251         6568 :     }
     252              : }
     253              : 
     254              : impl BlockReader for EphemeralFile {
     255      6254591 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     256      6254591 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     257      6254591 :     }
     258              : }
     259              : 
     260              : #[cfg(test)]
     261              : mod tests {
     262              :     use super::*;
     263              :     use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
     264              :     use rand::{thread_rng, RngCore};
     265              :     use std::fs;
     266              :     use std::str::FromStr;
     267              : 
     268            1 :     fn harness(
     269            1 :         test_name: &str,
     270            1 :     ) -> Result<(&'static PageServerConf, TenantId, TimelineId), io::Error> {
     271            1 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     272            1 :         let _ = fs::remove_dir_all(&repo_dir);
     273            1 :         let conf = PageServerConf::dummy_conf(repo_dir);
     274            1 :         // Make a static copy of the config. This can never be free'd, but that's
     275            1 :         // OK in a test.
     276            1 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     277            1 : 
     278            1 :         let tenant_id = TenantId::from_str("11000000000000000000000000000000").unwrap();
     279            1 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     280            1 :         fs::create_dir_all(conf.timeline_path(&tenant_id, &timeline_id))?;
     281              : 
     282            1 :         Ok((conf, tenant_id, timeline_id))
     283            1 :     }
     284              : 
     285            1 :     #[tokio::test]
     286            1 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     287            1 :         let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;
     288              : 
     289            1 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
     290              : 
     291            1 :         let pos_foo = file.write_blob(b"foo").await?;
     292            1 :         assert_eq!(
     293            1 :             b"foo",
     294            1 :             file.block_cursor().read_blob(pos_foo).await?.as_slice()
     295              :         );
     296            1 :         let pos_bar = file.write_blob(b"bar").await?;
     297            1 :         assert_eq!(
     298            1 :             b"foo",
     299            1 :             file.block_cursor().read_blob(pos_foo).await?.as_slice()
     300              :         );
     301            1 :         assert_eq!(
     302            1 :             b"bar",
     303            1 :             file.block_cursor().read_blob(pos_bar).await?.as_slice()
     304              :         );
     305              : 
     306            1 :         let mut blobs = Vec::new();
     307        10001 :         for i in 0..10000 {
     308        10000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     309        10000 :             let pos = file.write_blob(&data).await?;
     310        10000 :             blobs.push((pos, data));
     311              :         }
     312              :         // also test with a large blobs
     313          101 :         for i in 0..100 {
     314          100 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     315          100 :             let pos = file.write_blob(&data).await?;
     316          100 :             blobs.push((pos, data));
     317              :         }
     318              : 
     319            1 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     320        10101 :         for (pos, expected) in blobs {
     321        10100 :             let actual = cursor.read_blob(pos).await?;
     322        10100 :             assert_eq!(actual, expected);
     323              :         }
     324              : 
     325              :         // Test a large blob that spans multiple pages
     326            1 :         let mut large_data = Vec::new();
     327            1 :         large_data.resize(20000, 0);
     328            1 :         thread_rng().fill_bytes(&mut large_data);
     329            1 :         let pos_large = file.write_blob(&large_data).await?;
     330            1 :         let result = file.block_cursor().read_blob(pos_large).await?;
     331            1 :         assert_eq!(result, large_data);
     332              : 
     333            1 :         Ok(())
     334              :     }
     335              : }
        

Generated by: LCOV version 2.1-beta