LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 92.3 % 310 286
Test Date: 2024-02-14 18:05:35 Functions: 84.2 % 19 16

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::config::PageServerConf;
       5              : use crate::context::RequestContext;
       6              : use crate::page_cache::{self, PAGE_SZ};
       7              : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       8              : use crate::virtual_file::{self, VirtualFile};
       9              : use bytes::BytesMut;
      10              : use camino::Utf8PathBuf;
      11              : use pageserver_api::shard::TenantShardId;
      12              : use std::cmp::min;
      13              : 
      14              : use std::io::{self, ErrorKind};
      15              : use std::ops::DerefMut;
      16              : use std::sync::atomic::AtomicU64;
      17              : use tracing::*;
      18              : use utils::id::TimelineId;
      19              : 
      20              : pub struct EphemeralFile {
      21              :     page_cache_file_id: page_cache::FileId,
      22              : 
      23              :     _tenant_shard_id: TenantShardId,
      24              :     _timeline_id: TimelineId,
      25              :     file: VirtualFile,
      26              :     len: u64,
      27              :     /// An ephemeral file is append-only.
      28              :     /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
      29              :     /// The other pages, which can no longer be modified, are accessed through the page cache.
      30              :     ///
      31              :     /// None <=> IO is ongoing.
      32              :     /// Size is fixed to PAGE_SZ at creation time and must not be changed.
      33              :     mutable_tail: Option<BytesMut>,
      34              : }
      35              : 
      36              : impl EphemeralFile {
      37         5642 :     pub async fn create(
      38         5642 :         conf: &PageServerConf,
      39         5642 :         tenant_shard_id: TenantShardId,
      40         5642 :         timeline_id: TimelineId,
      41         5642 :     ) -> Result<EphemeralFile, io::Error> {
      42         5642 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      43         5642 :         let filename_disambiguator =
      44         5642 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      45         5642 : 
      46         5642 :         let filename = conf
      47         5642 :             .timeline_path(&tenant_shard_id, &timeline_id)
      48         5642 :             .join(Utf8PathBuf::from(format!(
      49         5642 :                 "ephemeral-{filename_disambiguator}"
      50         5642 :             )));
      51              : 
      52         5642 :         let file = VirtualFile::open_with_options(
      53         5642 :             &filename,
      54         5642 :             virtual_file::OpenOptions::new()
      55         5642 :                 .read(true)
      56         5642 :                 .write(true)
      57         5642 :                 .create(true),
      58         5642 :         )
      59          309 :         .await?;
      60              : 
      61         5642 :         Ok(EphemeralFile {
      62         5642 :             page_cache_file_id: page_cache::next_file_id(),
      63         5642 :             _tenant_shard_id: tenant_shard_id,
      64         5642 :             _timeline_id: timeline_id,
      65         5642 :             file,
      66         5642 :             len: 0,
      67         5642 :             mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
      68         5642 :         })
      69         5642 :     }
      70              : 
      71      1356123 :     pub(crate) fn len(&self) -> u64 {
      72      1356123 :         self.len
      73      1356123 :     }
      74              : 
      75     73932465 :     pub(crate) async fn read_blk(
      76     73932465 :         &self,
      77     73932465 :         blknum: u32,
      78     73932465 :         ctx: &RequestContext,
      79     73932472 :     ) -> Result<BlockLease, io::Error> {
      80     73932472 :         let flushed_blknums = 0..self.len / PAGE_SZ as u64;
      81     73932472 :         if flushed_blknums.contains(&(blknum as u64)) {
      82     73095003 :             let cache = page_cache::get();
      83     73095003 :             match cache
      84     73095003 :                 .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
      85       938781 :                 .await
      86     73095002 :                 .map_err(|e| {
      87            0 :                     std::io::Error::new(
      88            0 :                         std::io::ErrorKind::Other,
      89            0 :                         // order path before error because error is anyhow::Error => might have many contexts
      90            0 :                         format!(
      91            0 :                             "ephemeral file: read immutable page #{}: {}: {:#}",
      92            0 :                             blknum, self.file.path, e,
      93            0 :                         ),
      94            0 :                     )
      95     73095002 :                 })? {
      96     71579159 :                 page_cache::ReadBufResult::Found(guard) => {
      97     71579159 :                     return Ok(BlockLease::PageReadGuard(guard))
      98              :                 }
      99      1515843 :                 page_cache::ReadBufResult::NotFound(write_guard) => {
     100      1515843 :                     let write_guard = self
     101      1515843 :                         .file
     102      1515843 :                         .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
     103        31628 :                         .await?;
     104      1515843 :                     let read_guard = write_guard.mark_valid();
     105      1515843 :                     return Ok(BlockLease::PageReadGuard(read_guard));
     106              :                 }
     107              :             };
     108              :         } else {
     109       837469 :             debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
     110       837469 :             Ok(BlockLease::EphemeralFileMutableTail(
     111       837469 :                 self.mutable_tail
     112       837469 :                     .as_deref()
     113       837469 :                     .expect("we're not doing IO, it must be Some()")
     114       837469 :                     .try_into()
     115       837469 :                     .expect("we ensure that it's always PAGE_SZ"),
     116       837469 :             ))
     117              :         }
     118     73932471 :     }
     119              : 
     120     64701332 :     pub(crate) async fn write_blob(
     121     64701332 :         &mut self,
     122     64701332 :         srcbuf: &[u8],
     123     64701332 :         ctx: &RequestContext,
     124     64701338 :     ) -> Result<u64, io::Error> {
     125     64701338 :         struct Writer<'a> {
     126     64701338 :             ephemeral_file: &'a mut EphemeralFile,
     127     64701338 :             /// The block to which the next [`push_bytes`] will write.
     128     64701338 :             blknum: u32,
     129     64701338 :             /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
     130     64701338 :             off: usize,
     131     64701338 :         }
     132     64701338 :         impl<'a> Writer<'a> {
     133     64701338 :             fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
     134     64701332 :                 Ok(Writer {
     135     64701332 :                     blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
     136     64701332 :                     off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
     137     64701332 :                     ephemeral_file,
     138     64701332 :                 })
     139     64701332 :             }
     140     64701338 :             #[inline(always)]
     141    129402676 :             async fn push_bytes(
     142    129402676 :                 &mut self,
     143    129402676 :                 src: &[u8],
     144    129402676 :                 ctx: &RequestContext,
     145    129402676 :             ) -> Result<(), io::Error> {
     146    129402676 :                 let mut src_remaining = src;
     147    262486543 :                 while !src_remaining.is_empty() {
     148    133083867 :                     let dst_remaining = &mut self
     149    133083867 :                         .ephemeral_file
     150    133083867 :                         .mutable_tail
     151    133083867 :                         .as_deref_mut()
     152    133083867 :                         .expect("IO is not yet ongoing")[self.off..];
     153    133083867 :                     let n = min(dst_remaining.len(), src_remaining.len());
     154    133083867 :                     dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
     155    133083867 :                     self.off += n;
     156    133083867 :                     src_remaining = &src_remaining[n..];
     157    133083867 :                     if self.off == PAGE_SZ {
     158     64701338 :                         let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
     159      3696828 :                             .expect("IO is not yet ongoing");
     160     64701338 :                         let (mutable_tail, res) = self
     161      3696828 :                             .ephemeral_file
     162      3696828 :                             .file
     163      3696828 :                             .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
     164     64701338 :                             .await;
     165     64701338 :                         // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
     166     64701338 :                         // I.e., the IO isn't retryable if we panic.
     167     64701338 :                         self.ephemeral_file.mutable_tail = Some(mutable_tail);
     168      3696828 :                         match res {
     169     64701338 :                             Ok(_) => {
     170     64701338 :                                 // Pre-warm the page cache with what we just wrote.
     171     64701338 :                                 // This isn't necessary for coherency/correctness, but it's how we've always done it.
     172     64701338 :                                 let cache = page_cache::get();
     173      3696828 :                                 match cache
     174      3696828 :                                     .read_immutable_buf(
     175      3696828 :                                         self.ephemeral_file.page_cache_file_id,
     176      3696828 :                                         self.blknum,
     177      3696828 :                                         ctx,
     178      3696828 :                                     )
     179     64701338 :                                     .await
     180     64701338 :                                 {
     181     64701338 :                                     Ok(page_cache::ReadBufResult::Found(_guard)) => {
     182            0 :                                         // This function takes &mut self, so, it shouldn't be possible to reach this point.
     183            0 :                                         unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
     184     64701338 :                                     }
     185     64701338 :                                     Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
     186      3696828 :                                         let buf: &mut [u8] = write_guard.deref_mut();
     187     64701338 :                                         debug_assert_eq!(buf.len(), PAGE_SZ);
     188     64701338 :                                         buf.copy_from_slice(
     189      3696828 :                                             self.ephemeral_file
     190      3696828 :                                                 .mutable_tail
     191      3696828 :                                                 .as_deref()
     192      3696828 :                                                 .expect("IO is not ongoing"),
     193      3696828 :                                         );
     194      3696828 :                                         let _ = write_guard.mark_valid();
     195     64701338 :                                         // pre-warm successful
     196     64701338 :                                     }
     197     64701338 :                                     Err(e) => {
     198     64701338 :                                         error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
     199     64701338 :                                         // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
     200     64701338 :                                     }
     201     64701338 :                                 }
     202     64701338 :                                 // Zero the buffer for re-use.
     203     64701338 :                                 // Zeroing is critical for correcntess because the write_blob code below
     204     64701338 :                                 // and similarly read_blk expect zeroed pages.
     205     64701338 :                                 self.ephemeral_file
     206      3696828 :                                     .mutable_tail
     207      3696828 :                                     .as_deref_mut()
     208      3696828 :                                     .expect("IO is not ongoing")
     209      3696828 :                                     .fill(0);
     210      3696828 :                                 // This block is done, move to next one.
     211      3696828 :                                 self.blknum += 1;
     212      3696828 :                                 self.off = 0;
     213     64701338 :                             }
     214     64701338 :                             Err(e) => {
     215            0 :                                 return Err(std::io::Error::new(
     216            0 :                                     ErrorKind::Other,
     217            0 :                                     // order error before path because path is long and error is short
     218            0 :                                     format!(
     219            0 :                                         "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
     220            0 :                                         self.blknum,
     221            0 :                                         e,
     222            0 :                                         self.ephemeral_file.file.path,
     223            0 :                                     ),
     224            0 :                                 ));
     225     64701338 :                             }
     226     64701338 :                         }
     227    129387039 :                     }
     228     64701338 :                 }
     229    129402676 :                 Ok(())
     230    129402676 :             }
     231     64701338 :         }
     232     64701338 : 
     233     64701338 :         let pos = self.len;
     234     64701338 :         let mut writer = Writer::new(self)?;
     235              : 
     236              :         // Write the length field
     237     64701338 :         if srcbuf.len() < 0x80 {
     238              :             // short one-byte length header
     239     51449836 :             let len_buf = [srcbuf.len() as u8];
     240     51449836 :             writer.push_bytes(&len_buf, ctx).await?;
     241              :         } else {
     242     13251502 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
     243     13251502 :             len_buf[0] |= 0x80;
     244     13251502 :             writer.push_bytes(&len_buf, ctx).await?;
     245              :         }
     246              : 
     247              :         // Write the payload
     248     64701338 :         writer.push_bytes(srcbuf, ctx).await?;
     249              : 
     250     64701338 :         if srcbuf.len() < 0x80 {
     251     51449836 :             self.len += 1;
     252     51449836 :         } else {
     253     13251502 :             self.len += 4;
     254     13251502 :         }
     255     64701338 :         self.len += srcbuf.len() as u64;
     256     64701338 : 
     257     64701338 :         Ok(pos)
     258     64701338 :     }
     259              : }
     260              : 
     261              : /// Does the given filename look like an ephemeral file?
     262           48 : pub fn is_ephemeral_file(filename: &str) -> bool {
     263           48 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     264           47 :         rest.parse::<u32>().is_ok()
     265              :     } else {
     266            1 :         false
     267              :     }
     268           48 : }
     269              : 
     270              : impl Drop for EphemeralFile {
     271         5275 :     fn drop(&mut self) {
     272         5275 :         // There might still be pages in the [`crate::page_cache`] for this file.
     273         5275 :         // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
     274         5275 : 
     275         5275 :         // unlink the file
     276         5275 :         let res = std::fs::remove_file(&self.file.path);
     277         5275 :         if let Err(e) = res {
     278           63 :             if e.kind() != std::io::ErrorKind::NotFound {
     279              :                 // just never log the not found errors, we cannot do anything for them; on detach
     280              :                 // the tenant directory is already gone.
     281              :                 //
     282              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
     283            0 :                 error!(
     284            0 :                     "could not remove ephemeral file '{}': {}",
     285            0 :                     self.file.path, e
     286            0 :                 );
     287           63 :             }
     288         5212 :         }
     289         5275 :     }
     290              : }
     291              : 
     292              : impl BlockReader for EphemeralFile {
     293      6030257 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     294      6030257 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     295      6030257 :     }
     296              : }
     297              : 
     298              : #[cfg(test)]
     299              : mod tests {
     300              :     use super::*;
     301              :     use crate::context::DownloadBehavior;
     302              :     use crate::task_mgr::TaskKind;
     303              :     use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
     304              :     use rand::{thread_rng, RngCore};
     305              :     use std::fs;
     306              :     use std::str::FromStr;
     307              : 
     308            2 :     fn harness(
     309            2 :         test_name: &str,
     310            2 :     ) -> Result<
     311            2 :         (
     312            2 :             &'static PageServerConf,
     313            2 :             TenantShardId,
     314            2 :             TimelineId,
     315            2 :             RequestContext,
     316            2 :         ),
     317            2 :         io::Error,
     318            2 :     > {
     319            2 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     320            2 :         let _ = fs::remove_dir_all(&repo_dir);
     321            2 :         let conf = PageServerConf::dummy_conf(repo_dir);
     322            2 :         // Make a static copy of the config. This can never be free'd, but that's
     323            2 :         // OK in a test.
     324            2 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     325            2 : 
     326            2 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     327            2 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     328            2 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     329              : 
     330            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     331            2 : 
     332            2 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     333            2 :     }
     334              : 
     335            2 :     #[tokio::test]
     336            2 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     337            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
     338            2 : 
     339            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
     340            2 : 
     341            2 :         let pos_foo = file.write_blob(b"foo", &ctx).await?;
     342            2 :         assert_eq!(
     343            2 :             b"foo",
     344            2 :             file.block_cursor()
     345            2 :                 .read_blob(pos_foo, &ctx)
     346            2 :                 .await?
     347            2 :                 .as_slice()
     348            2 :         );
     349            2 :         let pos_bar = file.write_blob(b"bar", &ctx).await?;
     350            2 :         assert_eq!(
     351            2 :             b"foo",
     352            2 :             file.block_cursor()
     353            2 :                 .read_blob(pos_foo, &ctx)
     354            2 :                 .await?
     355            2 :                 .as_slice()
     356            2 :         );
     357            2 :         assert_eq!(
     358            2 :             b"bar",
     359            2 :             file.block_cursor()
     360            2 :                 .read_blob(pos_bar, &ctx)
     361            2 :                 .await?
     362            2 :                 .as_slice()
     363            2 :         );
     364            2 : 
     365            2 :         let mut blobs = Vec::new();
     366        20002 :         for i in 0..10000 {
     367        20000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     368        20000 :             let pos = file.write_blob(&data, &ctx).await?;
     369        20000 :             blobs.push((pos, data));
     370            2 :         }
     371            2 :         // also test with a large blobs
     372          202 :         for i in 0..100 {
     373          200 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     374          200 :             let pos = file.write_blob(&data, &ctx).await?;
     375          200 :             blobs.push((pos, data));
     376            2 :         }
     377            2 : 
     378            2 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     379        20202 :         for (pos, expected) in blobs {
     380        20200 :             let actual = cursor.read_blob(pos, &ctx).await?;
     381        20200 :             assert_eq!(actual, expected);
     382            2 :         }
     383            2 : 
     384            2 :         // Test a large blob that spans multiple pages
     385            2 :         let mut large_data = vec![0; 20000];
     386            2 :         thread_rng().fill_bytes(&mut large_data);
     387            2 :         let pos_large = file.write_blob(&large_data, &ctx).await?;
     388            2 :         let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
     389            2 :         assert_eq!(result, large_data);
     390            2 : 
     391            2 :         Ok(())
     392            2 :     }
     393              : }
        

Generated by: LCOV version 2.1-beta