LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 89.3 % 272 243 29 243
Current Date: 2024-01-09 02:06:09 Functions: 84.2 % 19 16 3 16
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Implementation of append-only file data structure
       2                 : //! used to keep in-memory layers spilled on disk.
       3                 : 
       4                 : use crate::config::PageServerConf;
       5                 : use crate::context::RequestContext;
       6                 : use crate::page_cache::{self, PAGE_SZ};
       7                 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       8                 : use crate::virtual_file::VirtualFile;
       9                 : use camino::Utf8PathBuf;
      10                 : use pageserver_api::shard::TenantShardId;
      11                 : use std::cmp::min;
      12                 : use std::fs::OpenOptions;
      13                 : use std::io::{self, ErrorKind};
      14                 : use std::ops::DerefMut;
      15                 : use std::sync::atomic::AtomicU64;
      16                 : use tracing::*;
      17                 : use utils::id::TimelineId;
      18                 : 
      19                 : pub struct EphemeralFile {
      20                 :     page_cache_file_id: page_cache::FileId,
      21                 : 
      22                 :     _tenant_shard_id: TenantShardId,
      23                 :     _timeline_id: TimelineId,
      24                 :     file: VirtualFile,
      25                 :     len: u64,
      26                 :     /// An ephemeral file is append-only.
      27                 :     /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
      28                 :     /// The other pages, which can no longer be modified, are accessed through the page cache.
      29                 :     mutable_tail: [u8; PAGE_SZ],
      30                 : }
      31                 : 
      32                 : impl EphemeralFile {
      33 CBC        4824 :     pub async fn create(
      34            4824 :         conf: &PageServerConf,
      35            4824 :         tenant_shard_id: TenantShardId,
      36            4824 :         timeline_id: TimelineId,
      37            4824 :     ) -> Result<EphemeralFile, io::Error> {
      38            4824 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      39            4824 :         let filename_disambiguator =
      40            4824 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      41            4824 : 
      42            4824 :         let filename = conf
      43            4824 :             .timeline_path(&tenant_shard_id, &timeline_id)
      44            4824 :             .join(Utf8PathBuf::from(format!(
      45            4824 :                 "ephemeral-{filename_disambiguator}"
      46            4824 :             )));
      47                 : 
      48            4824 :         let file = VirtualFile::open_with_options(
      49            4824 :             &filename,
      50            4824 :             OpenOptions::new().read(true).write(true).create(true),
      51            4824 :         )
      52 UBC           0 :         .await?;
      53                 : 
      54 CBC        4824 :         Ok(EphemeralFile {
      55            4824 :             page_cache_file_id: page_cache::next_file_id(),
      56            4824 :             _tenant_shard_id: tenant_shard_id,
      57            4824 :             _timeline_id: timeline_id,
      58            4824 :             file,
      59            4824 :             len: 0,
      60            4824 :             mutable_tail: [0u8; PAGE_SZ],
      61            4824 :         })
      62            4824 :     }
      63                 : 
      64          565114 :     pub(crate) fn len(&self) -> u64 {
      65          565114 :         self.len
      66          565114 :     }
      67                 : 
      68        62755269 :     pub(crate) async fn read_blk(
      69        62755269 :         &self,
      70        62755269 :         blknum: u32,
      71        62755269 :         ctx: &RequestContext,
      72        62755269 :     ) -> Result<BlockLease, io::Error> {
      73        62755195 :         let flushed_blknums = 0..self.len / PAGE_SZ as u64;
      74        62755195 :         if flushed_blknums.contains(&(blknum as u64)) {
      75        62185362 :             let cache = page_cache::get();
      76        62185362 :             match cache
      77        62185362 :                 .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
      78          801599 :                 .await
      79        62185361 :                 .map_err(|e| {
      80 UBC           0 :                     std::io::Error::new(
      81               0 :                         std::io::ErrorKind::Other,
      82               0 :                         // order path before error because error is anyhow::Error => might have many contexts
      83               0 :                         format!(
      84               0 :                             "ephemeral file: read immutable page #{}: {}: {:#}",
      85               0 :                             blknum, self.file.path, e,
      86               0 :                         ),
      87               0 :                     )
      88 CBC    62185361 :                 })? {
      89        60937274 :                 page_cache::ReadBufResult::Found(guard) => {
      90        60937274 :                     return Ok(BlockLease::PageReadGuard(guard))
      91                 :                 }
      92         1248087 :                 page_cache::ReadBufResult::NotFound(mut write_guard) => {
      93         1248087 :                     let buf: &mut [u8] = write_guard.deref_mut();
      94         1248087 :                     debug_assert_eq!(buf.len(), PAGE_SZ);
      95         1248087 :                     self.file
      96         1248087 :                         .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
      97 UBC           0 :                         .await?;
      98 CBC     1248087 :                     let read_guard = write_guard.mark_valid();
      99         1248087 :                     return Ok(BlockLease::PageReadGuard(read_guard));
     100                 :                 }
     101                 :             };
     102                 :         } else {
     103          569833 :             debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
     104          569833 :             Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
     105                 :         }
     106        62755194 :     }
     107                 : 
     108        53139611 :     pub(crate) async fn write_blob(
     109        53139611 :         &mut self,
     110        53139611 :         srcbuf: &[u8],
     111        53139611 :         ctx: &RequestContext,
     112        53139611 :     ) -> Result<u64, io::Error> {
     113        53139611 :         struct Writer<'a> {
     114        53139611 :             ephemeral_file: &'a mut EphemeralFile,
     115        53139611 :             /// The block to which the next [`push_bytes`] will write.
     116        53139611 :             blknum: u32,
     117        53139611 :             /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
     118        53139611 :             off: usize,
     119        53139611 :         }
     120        53139611 :         impl<'a> Writer<'a> {
     121        53139611 :             fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
     122        53139611 :                 Ok(Writer {
     123        53139611 :                     blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
     124        53139611 :                     off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
     125        53139611 :                     ephemeral_file,
     126        53139611 :                 })
     127        53139611 :             }
     128        53139611 :             #[inline(always)]
     129       106279222 :             async fn push_bytes(
     130       106279222 :                 &mut self,
     131       106279222 :                 src: &[u8],
     132       106279222 :                 ctx: &RequestContext,
     133       106279222 :             ) -> Result<(), io::Error> {
     134       106279222 :                 let mut src_remaining = src;
     135       215769135 :                 while !src_remaining.is_empty() {
     136       109489913 :                     let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
     137       109489913 :                     let n = min(dst_remaining.len(), src_remaining.len());
     138       109489913 :                     dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
     139       109489913 :                     self.off += n;
     140       109489913 :                     src_remaining = &src_remaining[n..];
     141       109489913 :                     if self.off == PAGE_SZ {
     142        53139611 :                         match self
     143         3223139 :                             .ephemeral_file
     144         3223139 :                             .file
     145         3223139 :                             .write_all_at(
     146         3223139 :                                 &self.ephemeral_file.mutable_tail,
     147         3223139 :                                 self.blknum as u64 * PAGE_SZ as u64,
     148         3223139 :                             )
     149        53139611 :                             .await
     150        53139611 :                         {
     151        53139611 :                             Ok(_) => {
     152        53139611 :                                 // Pre-warm the page cache with what we just wrote.
     153        53139611 :                                 // This isn't necessary for coherency/correctness, but it's how we've always done it.
     154        53139611 :                                 let cache = page_cache::get();
     155         3223139 :                                 match cache
     156         3223139 :                                     .read_immutable_buf(
     157         3223139 :                                         self.ephemeral_file.page_cache_file_id,
     158         3223139 :                                         self.blknum,
     159         3223139 :                                         ctx,
     160         3223139 :                                     )
     161        53139611 :                                     .await
     162        53139611 :                                 {
     163        53139611 :                                     Ok(page_cache::ReadBufResult::Found(_guard)) => {
     164 UBC           0 :                                         // This function takes &mut self, so, it shouldn't be possible to reach this point.
     165               0 :                                         unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
     166 CBC    53139611 :                                     }
     167        53139611 :                                     Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
     168         3223139 :                                         let buf: &mut [u8] = write_guard.deref_mut();
     169        53139611 :                                         debug_assert_eq!(buf.len(), PAGE_SZ);
     170        53139611 :                                         buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
     171         3223139 :                                         let _ = write_guard.mark_valid();
     172        53139611 :                                         // pre-warm successful
     173        53139611 :                                     }
     174        53139611 :                                     Err(e) => {
     175        53139611 :                                         error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
     176        53139611 :                                         // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
     177        53139611 :                                     }
     178        53139611 :                                 }
     179        53139611 :                                 // Zero the buffer for re-use.
     180        53139611 :                                 // Zeroing is critical for correcntess because the write_blob code below
     181        53139611 :                                 // and similarly read_blk expect zeroed pages.
     182        53139611 :                                 self.ephemeral_file.mutable_tail.fill(0);
     183         3223139 :                                 // This block is done, move to next one.
     184         3223139 :                                 self.blknum += 1;
     185         3223139 :                                 self.off = 0;
     186        53139611 :                             }
     187        53139611 :                             Err(e) => {
     188 UBC           0 :                                 return Err(std::io::Error::new(
     189               0 :                                     ErrorKind::Other,
     190               0 :                                     // order error before path because path is long and error is short
     191               0 :                                     format!(
     192               0 :                                         "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
     193               0 :                                         self.blknum,
     194               0 :                                         e,
     195               0 :                                         self.ephemeral_file.file.path,
     196               0 :                                     ),
     197               0 :                                 ));
     198 CBC    53139611 :                             }
     199        53139611 :                         }
     200       106266774 :                     }
     201        53139611 :                 }
     202       106279222 :                 Ok(())
     203       106279222 :             }
     204        53139611 :         }
     205        53139611 : 
     206        53139611 :         let pos = self.len;
     207        53139611 :         let mut writer = Writer::new(self)?;
     208                 : 
     209                 :         // Write the length field
     210        53139611 :         if srcbuf.len() < 0x80 {
     211                 :             // short one-byte length header
     212        41116657 :             let len_buf = [srcbuf.len() as u8];
     213        41116657 :             writer.push_bytes(&len_buf, ctx).await?;
     214                 :         } else {
     215        12022954 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
     216        12022954 :             len_buf[0] |= 0x80;
     217        12022954 :             writer.push_bytes(&len_buf, ctx).await?;
     218                 :         }
     219                 : 
     220                 :         // Write the payload
     221        53139611 :         writer.push_bytes(srcbuf, ctx).await?;
     222                 : 
     223        53139611 :         if srcbuf.len() < 0x80 {
     224        41116657 :             self.len += 1;
     225        41116657 :         } else {
     226        12022954 :             self.len += 4;
     227        12022954 :         }
     228        53139611 :         self.len += srcbuf.len() as u64;
     229        53139611 : 
     230        53139611 :         Ok(pos)
     231        53139611 :     }
     232                 : }
     233                 : 
     234                 : /// Does the given filename look like an ephemeral file?
     235              57 : pub fn is_ephemeral_file(filename: &str) -> bool {
     236              57 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     237              51 :         rest.parse::<u32>().is_ok()
     238                 :     } else {
     239               6 :         false
     240                 :     }
     241              57 : }
     242                 : 
     243                 : impl Drop for EphemeralFile {
     244            4539 :     fn drop(&mut self) {
     245            4539 :         // There might still be pages in the [`crate::page_cache`] for this file.
     246            4539 :         // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
     247            4539 : 
     248            4539 :         // unlink the file
     249            4539 :         let res = std::fs::remove_file(&self.file.path);
     250            4539 :         if let Err(e) = res {
     251              66 :             if e.kind() != std::io::ErrorKind::NotFound {
     252                 :                 // just never log the not found errors, we cannot do anything for them; on detach
     253                 :                 // the tenant directory is already gone.
     254                 :                 //
     255                 :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
     256 UBC           0 :                 error!(
     257               0 :                     "could not remove ephemeral file '{}': {}",
     258               0 :                     self.file.path, e
     259               0 :                 );
     260 CBC          66 :             }
     261            4473 :         }
     262            4539 :     }
     263                 : }
     264                 : 
     265                 : impl BlockReader for EphemeralFile {
     266         4672869 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     267         4672869 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     268         4672869 :     }
     269                 : }
     270                 : 
     271                 : #[cfg(test)]
     272                 : mod tests {
     273                 :     use super::*;
     274                 :     use crate::context::DownloadBehavior;
     275                 :     use crate::task_mgr::TaskKind;
     276                 :     use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
     277                 :     use rand::{thread_rng, RngCore};
     278                 :     use std::fs;
     279                 :     use std::str::FromStr;
     280                 : 
     281               1 :     fn harness(
     282               1 :         test_name: &str,
     283               1 :     ) -> Result<
     284               1 :         (
     285               1 :             &'static PageServerConf,
     286               1 :             TenantShardId,
     287               1 :             TimelineId,
     288               1 :             RequestContext,
     289               1 :         ),
     290               1 :         io::Error,
     291               1 :     > {
     292               1 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     293               1 :         let _ = fs::remove_dir_all(&repo_dir);
     294               1 :         let conf = PageServerConf::dummy_conf(repo_dir);
     295               1 :         // Make a static copy of the config. This can never be free'd, but that's
     296               1 :         // OK in a test.
     297               1 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     298               1 : 
     299               1 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     300               1 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     301               1 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     302                 : 
     303               1 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     304               1 : 
     305               1 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     306               1 :     }
     307                 : 
     308               1 :     #[tokio::test]
     309               1 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     310               1 :         let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
     311                 : 
     312               1 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
     313                 : 
     314               1 :         let pos_foo = file.write_blob(b"foo", &ctx).await?;
     315               1 :         assert_eq!(
     316               1 :             b"foo",
     317               1 :             file.block_cursor()
     318               1 :                 .read_blob(pos_foo, &ctx)
     319 UBC           0 :                 .await?
     320 CBC           1 :                 .as_slice()
     321                 :         );
     322               1 :         let pos_bar = file.write_blob(b"bar", &ctx).await?;
     323               1 :         assert_eq!(
     324               1 :             b"foo",
     325               1 :             file.block_cursor()
     326               1 :                 .read_blob(pos_foo, &ctx)
     327 UBC           0 :                 .await?
     328 CBC           1 :                 .as_slice()
     329                 :         );
     330               1 :         assert_eq!(
     331               1 :             b"bar",
     332               1 :             file.block_cursor()
     333               1 :                 .read_blob(pos_bar, &ctx)
     334 UBC           0 :                 .await?
     335 CBC           1 :                 .as_slice()
     336                 :         );
     337                 : 
     338               1 :         let mut blobs = Vec::new();
     339           10001 :         for i in 0..10000 {
     340           10000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     341           10000 :             let pos = file.write_blob(&data, &ctx).await?;
     342           10000 :             blobs.push((pos, data));
     343                 :         }
     344                 :         // also test with a large blobs
     345             101 :         for i in 0..100 {
     346             100 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     347             100 :             let pos = file.write_blob(&data, &ctx).await?;
     348             100 :             blobs.push((pos, data));
     349                 :         }
     350                 : 
     351               1 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     352           10101 :         for (pos, expected) in blobs {
     353           10100 :             let actual = cursor.read_blob(pos, &ctx).await?;
     354           10100 :             assert_eq!(actual, expected);
     355                 :         }
     356                 : 
     357                 :         // Test a large blob that spans multiple pages
     358               1 :         let mut large_data = vec![0; 20000];
     359               1 :         thread_rng().fill_bytes(&mut large_data);
     360               1 :         let pos_large = file.write_blob(&large_data, &ctx).await?;
     361               1 :         let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
     362               1 :         assert_eq!(result, large_data);
     363                 : 
     364               1 :         Ok(())
     365                 :     }
     366                 : }
        

Generated by: LCOV version 2.1-beta