       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::config::PageServerConf;
       5              : use crate::context::RequestContext;
       6              : use crate::page_cache::{self, PAGE_SZ};
       7              : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       8              : use crate::virtual_file::{self, VirtualFile};
       9              : use camino::Utf8PathBuf;
      10              : use pageserver_api::shard::TenantShardId;
      11              : use std::cmp::min;
      12              : 
      13              : use std::io::{self, ErrorKind};
      14              : use std::ops::DerefMut;
      15              : use std::sync::atomic::AtomicU64;
      16              : use tracing::*;
      17              : use utils::id::TimelineId;
      18              : 
      19              : pub struct EphemeralFile {
      20              :     page_cache_file_id: page_cache::FileId,
      21              : 
      22              :     _tenant_shard_id: TenantShardId,
      23              :     _timeline_id: TimelineId,
      24              :     file: VirtualFile,
      25              :     len: u64,
      26              :     /// An ephemeral file is append-only.
      27              :     /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
      28              :     /// The other pages, which can no longer be modified, are accessed through the page cache.
      29              :     mutable_tail: [u8; PAGE_SZ],
      30              : }
      31              : 
      32              : impl EphemeralFile {
      33         5715 :     pub async fn create(
      34         5715 :         conf: &PageServerConf,
      35         5715 :         tenant_shard_id: TenantShardId,
      36         5715 :         timeline_id: TimelineId,
      37         5715 :     ) -> Result<EphemeralFile, io::Error> {
      38         5715 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      39         5715 :         let filename_disambiguator =
      40         5715 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      41         5715 : 
      42         5715 :         let filename = conf
      43         5715 :             .timeline_path(&tenant_shard_id, &timeline_id)
      44         5715 :             .join(Utf8PathBuf::from(format!(
      45         5715 :                 "ephemeral-{filename_disambiguator}"
      46         5715 :             )));
      47              : 
      48         5715 :         let file = VirtualFile::open_with_options(
      49         5715 :             &filename,
      50         5715 :             virtual_file::OpenOptions::new()
      51         5715 :                 .read(true)
      52         5715 :                 .write(true)
      53         5715 :                 .create(true),
      54         5715 :         )
      55          271 :         .await?;
      56              : 
      57         5715 :         Ok(EphemeralFile {
      58         5715 :             page_cache_file_id: page_cache::next_file_id(),
      59         5715 :             _tenant_shard_id: tenant_shard_id,
      60         5715 :             _timeline_id: timeline_id,
      61         5715 :             file,
      62         5715 :             len: 0,
      63         5715 :             mutable_tail: [0u8; PAGE_SZ],
      64         5715 :         })
      65         5715 :     }
      66              : 
      67      1400622 :     pub(crate) fn len(&self) -> u64 {
      68      1400622 :         self.len
      69      1400622 :     }
      70              : 
      71     74145201 :     pub(crate) async fn read_blk(
      72     74145201 :         &self,
      73     74145201 :         blknum: u32,
      74     74145201 :         ctx: &RequestContext,
      75     74145201 :     ) -> Result<BlockLease, io::Error> {
      76     74145199 :         let flushed_blknums = 0..self.len / PAGE_SZ as u64;
      77     74145199 :         if flushed_blknums.contains(&(blknum as u64)) {
      78     73271138 :             let cache = page_cache::get();
      79     73271138 :             match cache
      80     73271138 :                 .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
      81       943218 :                 .await
      82     73271137 :                 .map_err(|e| {
      83            0 :                     std::io::Error::new(
      84            0 :                         std::io::ErrorKind::Other,
      85            0 :                         // order path before error because error is anyhow::Error => might have many contexts
      86            0 :                         format!(
      87            0 :                             "ephemeral file: read immutable page #{}: {}: {:#}",
      88            0 :                             blknum, self.file.path, e,
      89            0 :                         ),
      90            0 :                     )
      91     73271137 :                 })? {
      92     71740078 :                 page_cache::ReadBufResult::Found(guard) => {
      93     71740078 :                     return Ok(BlockLease::PageReadGuard(guard))
      94              :                 }
      95      1531059 :                 page_cache::ReadBufResult::NotFound(write_guard) => {
      96      1531059 :                     let write_guard = self
      97      1531059 :                         .file
      98      1531059 :                         .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
      99        31604 :                         .await?;
     100      1531059 :                     let read_guard = write_guard.mark_valid();
     101      1531059 :                     return Ok(BlockLease::PageReadGuard(read_guard));
     102              :                 }
     103              :             };
     104              :         } else {
     105       874061 :             debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
     106       874061 :             Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
     107              :         }
     108     74145198 :     }
     109              : 
     110     65328307 :     pub(crate) async fn write_blob(
     111     65328307 :         &mut self,
     112     65328307 :         srcbuf: &[u8],
     113     65328307 :         ctx: &RequestContext,
     114     65328307 :     ) -> Result<u64, io::Error> {
     115     65328307 :         struct Writer<'a> {
     116     65328307 :             ephemeral_file: &'a mut EphemeralFile,
     117     65328307 :             /// The block to which the next [`push_bytes`] will write.
     118     65328307 :             blknum: u32,
     119     65328307 :             /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
     120     65328307 :             off: usize,
     121     65328307 :         }
     122     65328307 :         impl<'a> Writer<'a> {
     123     65328307 :             fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
     124     65328307 :                 Ok(Writer {
     125     65328307 :                     blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
     126     65328307 :                     off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
     127     65328307 :                     ephemeral_file,
     128     65328307 :                 })
     129     65328307 :             }
     130     65328307 :             #[inline(always)]
     131    130656614 :             async fn push_bytes(
     132    130656614 :                 &mut self,
     133    130656614 :                 src: &[u8],
     134    130656614 :                 ctx: &RequestContext,
     135    130656614 :             ) -> Result<(), io::Error> {
     136    130656614 :                 let mut src_remaining = src;
     137    265015021 :                 while !src_remaining.is_empty() {
     138    134358408 :                     let dst_remaining = &mut self.ephemeral_file.mutable_tail[];
     139    134358408 :                     let n = min(dst_remaining.len(), src_remaining.len());
     140    134358408 :                     dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
     141    134358408 :            += n;
     142    134358408 :                     src_remaining = &src_remaining[n..];
     143    134358408 :                     if == PAGE_SZ {
     144     65328307 :                         match self
     145      3717705 :                             .ephemeral_file
     146      3717705 :                             .file
     147      3717705 :                             .write_all_at(
     148      3717705 :                                 &self.ephemeral_file.mutable_tail,
     149      3717705 :                                 self.blknum as u64 * PAGE_SZ as u64,
     150      3717705 :                             )
     151     65328307 :                             .await
     152     65328307 :                         {
     153     65328307 :                             Ok(_) => {
     154     65328307 :                                 // Pre-warm the page cache with what we just wrote.
     155     65328307 :                                 // This isn't necessary for coherency/correctness, but it's how we've always done it.
     156     65328307 :                                 let cache = page_cache::get();
     157      3717704 :                                 match cache
     158      3717704 :                                     .read_immutable_buf(
     159      3717704 :                                         self.ephemeral_file.page_cache_file_id,
     160      3717704 :                                         self.blknum,
     161      3717704 :                                         ctx,
     162      3717704 :                                     )
     163     65328307 :                                     .await
     164     65328307 :                                 {
     165     65328307 :                                     Ok(page_cache::ReadBufResult::Found(_guard)) => {
     166            0 :                                         // This function takes &mut self, so, it shouldn't be possible to reach this point.
     167            0 :                                         unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
     168     65328307 :                                     }
     169     65328307 :                                     Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
     170      3717704 :                                         let buf: &mut [u8] = write_guard.deref_mut();
     171     65328307 :                                         debug_assert_eq!(buf.len(), PAGE_SZ);
     172     65328307 :                                         buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
     173      3717704 :                                         let _ = write_guard.mark_valid();
     174     65328307 :                                         // pre-warm successful
     175     65328307 :                                     }
     176     65328307 :                                     Err(e) => {
     177     65328307 :                                         error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
     178     65328307 :                                         // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
     179     65328307 :                                     }
     180     65328307 :                                 }
     181     65328307 :                                 // Zero the buffer for re-use.
     182     65328307 :                                 // Zeroing is critical for correcntess because the write_blob code below
     183     65328307 :                                 // and similarly read_blk expect zeroed pages.
     184     65328307 :                                 self.ephemeral_file.mutable_tail.fill(0);
     185      3717704 :                                 // This block is done, move to next one.
     186      3717704 :                                 self.blknum += 1;
     187      3717704 :                        = 0;
     188     65328307 :                             }
     189     65328307 :                             Err(e) => {
     190            0 :                                 return Err(std::io::Error::new(
     191            0 :                                     ErrorKind::Other,
     192            0 :                                     // order error before path because path is long and error is short
     193            0 :                                     format!(
     194            0 :                                         "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
     195            0 :                                         self.blknum,
     196            0 :                                         e,
     197            0 :                                         self.ephemeral_file.file.path,
     198            0 :                                     ),
     199            0 :                                 ));
     200     65328307 :                             }
     201     65328307 :                         }
     202    130640703 :                     }
     203     65328307 :                 }
     204    130656613 :                 Ok(())
     205    130656613 :             }
     206     65328307 :         }
     207     65328307 : 
     208     65328307 :         let pos = self.len;
     209     65328307 :         let mut writer = Writer::new(self)?;
     210              : 
     211              :         // Write the length field
     212     65328307 :         if srcbuf.len() < 0x80 {
     213              :             // short one-byte length header
     214     52117855 :             let len_buf = [srcbuf.len() as u8];
     215     52117855 :             writer.push_bytes(&len_buf, ctx).await?;
     216              :         } else {
     217     13210452 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
     218     13210452 :             len_buf[0] |= 0x80;
     219     13210452 :             writer.push_bytes(&len_buf, ctx).await?;
     220              :         }
     221              : 
     222              :         // Write the payload
     223     65328307 :         writer.push_bytes(srcbuf, ctx).await?;
     224              : 
     225     65328306 :         if srcbuf.len() < 0x80 {
     226     52117854 :             self.len += 1;
     227     52117854 :         } else {
     228     13210452 :             self.len += 4;
     229     13210452 :         }
     230     65328306 :         self.len += srcbuf.len() as u64;
     231     65328306 : 
     232     65328306 :         Ok(pos)
     233     65328306 :     }
     234              : }
     235              : 
     236              : /// Does the given filename look like an ephemeral file?
     237           48 : pub fn is_ephemeral_file(filename: &str) -> bool {
     238           48 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     239           47 :         rest.parse::<u32>().is_ok()
     240              :     } else {
     241            1 :         false
     242              :     }
     243           48 : }
     244              : 
     245              : impl Drop for EphemeralFile {
     246         5358 :     fn drop(&mut self) {
     247         5358 :         // There might still be pages in the [`crate::page_cache`] for this file.
     248         5358 :         // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
     249         5358 : 
     250         5358 :         // unlink the file
     251         5358 :         let res = std::fs::remove_file(&self.file.path);
     252         5358 :         if let Err(e) = res {
     253           61 :             if e.kind() != std::io::ErrorKind::NotFound {
     254              :                 // just never log the not found errors, we cannot do anything for them; on detach
     255              :                 // the tenant directory is already gone.
     256              :                 //
     257              :                 // not found files might also be related to
     258            0 :                 error!(
     259            0 :                     "could not remove ephemeral file '{}': {}",
     260            0 :                     self.file.path, e
     261            0 :                 );
     262           61 :             }
     263         5297 :         }
     264         5358 :     }
     265              : }
     266              : 
     267              : impl BlockReader for EphemeralFile {
     268      6967533 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     269      6967533 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     270      6967533 :     }
     271              : }
     272              : 
     273              : #[cfg(test)]
     274              : mod tests {
     275              :     use super::*;
     276              :     use crate::context::DownloadBehavior;
     277              :     use crate::task_mgr::TaskKind;
     278              :     use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
     279              :     use rand::{thread_rng, RngCore};
     280              :     use std::fs;
     281              :     use std::str::FromStr;
     282              : 
     283            2 :     fn harness(
     284            2 :         test_name: &str,
     285            2 :     ) -> Result<
     286            2 :         (
     287            2 :             &'static PageServerConf,
     288            2 :             TenantShardId,
     289            2 :             TimelineId,
     290            2 :             RequestContext,
     291            2 :         ),
     292            2 :         io::Error,
     293            2 :     > {
     294            2 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     295            2 :         let _ = fs::remove_dir_all(&repo_dir);
     296            2 :         let conf = PageServerConf::dummy_conf(repo_dir);
     297            2 :         // Make a static copy of the config. This can never be free'd, but that's
     298            2 :         // OK in a test.
     299            2 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     300            2 : 
     301            2 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     302            2 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     303            2 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     304              : 
     305            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     306            2 : 
     307            2 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     308            2 :     }
     309              : 
     310            2 :     #[tokio::test]
     311            2 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     312            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
     313              : 
     314            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
     315              : 
     316            2 :         let pos_foo = file.write_blob(b"foo", &ctx).await?;
     317            2 :         assert_eq!(
     318            2 :             b"foo",
     319            2 :             file.block_cursor()
     320            2 :                 .read_blob(pos_foo, &ctx)
     321            0 :                 .await?
     322            2 :                 .as_slice()
     323              :         );
     324            2 :         let pos_bar = file.write_blob(b"bar", &ctx).await?;
     325            2 :         assert_eq!(
     326            2 :             b"foo",
     327            2 :             file.block_cursor()
     328            2 :                 .read_blob(pos_foo, &ctx)
     329            0 :                 .await?
     330            2 :                 .as_slice()
     331              :         );
     332            2 :         assert_eq!(
     333            2 :             b"bar",
     334            2 :             file.block_cursor()
     335            2 :                 .read_blob(pos_bar, &ctx)
     336            0 :                 .await?
     337            2 :                 .as_slice()
     338              :         );
     339              : 
     340            2 :         let mut blobs = Vec::new();
     341        20002 :         for i in 0..10000 {
     342        20000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     343        20000 :             let pos = file.write_blob(&data, &ctx).await?;
     344        20000 :             blobs.push((pos, data));
     345              :         }
     346              :         // also test with a large blobs
     347          202 :         for i in 0..100 {
     348          200 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     349          200 :             let pos = file.write_blob(&data, &ctx).await?;
     350          200 :             blobs.push((pos, data));
     351              :         }
     352              : 
     353            2 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     354        20202 :         for (pos, expected) in blobs {
     355        20200 :             let actual = cursor.read_blob(pos, &ctx).await?;
     356        20200 :             assert_eq!(actual, expected);
     357              :         }
     358              : 
     359              :         // Test a large blob that spans multiple pages
     360            2 :         let mut large_data = vec![0; 20000];
     361            2 :         thread_rng().fill_bytes(&mut large_data);
     362            2 :         let pos_large = file.write_blob(&large_data, &ctx).await?;
     363            2 :         let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
     364            2 :         assert_eq!(result, large_data);
     365              : 
     366            2 :         Ok(())
     367              :     }
     368              : }

