LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 94.8 % 154 146
Test Date: 2024-07-03 15:33:13 Functions: 80.0 % 15 12

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::config::PageServerConf;
       5              : use crate::context::RequestContext;
       6              : use crate::page_cache;
       7              : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       8              : use crate::virtual_file::{self, VirtualFile};
       9              : use camino::Utf8PathBuf;
      10              : use pageserver_api::shard::TenantShardId;
      11              : 
      12              : use std::io;
      13              : use std::sync::atomic::AtomicU64;
      14              : use utils::id::TimelineId;
      15              : 
      16              : pub struct EphemeralFile {
      17              :     _tenant_shard_id: TenantShardId,
      18              :     _timeline_id: TimelineId,
      19              : 
      20              :     rw: page_caching::RW,
      21              : }
      22              : 
      23              : mod page_caching;
      24              : pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
      25              : mod zero_padded_read_write;
      26              : 
      27              : impl EphemeralFile {
      28         1246 :     pub async fn create(
      29         1246 :         conf: &PageServerConf,
      30         1246 :         tenant_shard_id: TenantShardId,
      31         1246 :         timeline_id: TimelineId,
      32         1246 :         ctx: &RequestContext,
      33         1246 :     ) -> Result<EphemeralFile, io::Error> {
      34         1246 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      35         1246 :         let filename_disambiguator =
      36         1246 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      37         1246 : 
      38         1246 :         let filename = conf
      39         1246 :             .timeline_path(&tenant_shard_id, &timeline_id)
      40         1246 :             .join(Utf8PathBuf::from(format!(
      41         1246 :                 "ephemeral-{filename_disambiguator}"
      42         1246 :             )));
      43              : 
      44         1246 :         let file = VirtualFile::open_with_options(
      45         1246 :             &filename,
      46         1246 :             virtual_file::OpenOptions::new()
      47         1246 :                 .read(true)
      48         1246 :                 .write(true)
      49         1246 :                 .create(true),
      50         1246 :             ctx,
      51         1246 :         )
      52          694 :         .await?;
      53              : 
      54         1246 :         Ok(EphemeralFile {
      55         1246 :             _tenant_shard_id: tenant_shard_id,
      56         1246 :             _timeline_id: timeline_id,
      57         1246 :             rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
      58         1246 :         })
      59         1246 :     }
      60              : 
      61      5091694 :     pub(crate) fn len(&self) -> u64 {
      62      5091694 :         self.rw.bytes_written()
      63      5091694 :     }
      64              : 
      65         1244 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
      66         1244 :         self.rw.page_cache_file_id()
      67         1244 :     }
      68              : 
      69              :     /// See [`self::page_caching::RW::load_to_vec`].
      70            0 :     pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
      71            0 :         self.rw.load_to_vec(ctx).await
      72            0 :     }
      73              : 
      74      4955837 :     pub(crate) async fn read_blk(
      75      4955837 :         &self,
      76      4955837 :         blknum: u32,
      77      4955837 :         ctx: &RequestContext,
      78      4955837 :     ) -> Result<BlockLease, io::Error> {
      79      4955837 :         self.rw.read_blk(blknum, ctx).await
      80      4955837 :     }
      81              : 
      82      5110656 :     pub(crate) async fn write_blob(
      83      5110656 :         &mut self,
      84      5110656 :         srcbuf: &[u8],
      85      5110656 :         ctx: &RequestContext,
      86      5110656 :     ) -> Result<u64, io::Error> {
      87      5110656 :         let pos = self.rw.bytes_written();
      88      5110656 : 
      89      5110656 :         // Write the length field
      90      5110656 :         if srcbuf.len() < 0x80 {
      91              :             // short one-byte length header
      92      4956838 :             let len_buf = [srcbuf.len() as u8];
      93      4956838 : 
      94      4956838 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      95              :         } else {
      96       153818 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
      97       153818 :             len_buf[0] |= 0x80;
      98       153818 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      99              :         }
     100              : 
     101              :         // Write the payload
     102      5110656 :         self.rw.write_all_borrowed(srcbuf, ctx).await?;
     103              : 
     104      5110656 :         Ok(pos)
     105      5110656 :     }
     106              : }
     107              : 
     108              : /// Does the given filename look like an ephemeral file?
     109            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     110            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     111            0 :         rest.parse::<u32>().is_ok()
     112              :     } else {
     113            0 :         false
     114              :     }
     115            0 : }
     116              : 
     117              : impl BlockReader for EphemeralFile {
     118       607250 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     119       607250 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     120       607250 :     }
     121              : }
     122              : 
     123              : #[cfg(test)]
     124              : mod tests {
     125              :     use super::*;
     126              :     use crate::context::DownloadBehavior;
     127              :     use crate::task_mgr::TaskKind;
     128              :     use crate::tenant::block_io::BlockReaderRef;
     129              :     use rand::{thread_rng, RngCore};
     130              :     use std::fs;
     131              :     use std::str::FromStr;
     132              : 
     133            2 :     fn harness(
     134            2 :         test_name: &str,
     135            2 :     ) -> Result<
     136            2 :         (
     137            2 :             &'static PageServerConf,
     138            2 :             TenantShardId,
     139            2 :             TimelineId,
     140            2 :             RequestContext,
     141            2 :         ),
     142            2 :         io::Error,
     143            2 :     > {
     144            2 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     145            2 :         let _ = fs::remove_dir_all(&repo_dir);
     146            2 :         let conf = PageServerConf::dummy_conf(repo_dir);
     147            2 :         // Make a static copy of the config. This can never be free'd, but that's
     148            2 :         // OK in a test.
     149            2 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     150            2 : 
     151            2 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     152            2 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     153            2 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     154              : 
     155            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     156            2 : 
     157            2 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     158            2 :     }
     159              : 
     160              :     #[tokio::test]
     161            2 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     162            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
     163            2 : 
     164            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
     165            2 : 
     166            2 :         let pos_foo = file.write_blob(b"foo", &ctx).await?;
     167            2 :         assert_eq!(
     168            2 :             b"foo",
     169            2 :             file.block_cursor()
     170            2 :                 .read_blob(pos_foo, &ctx)
     171            2 :                 .await?
     172            2 :                 .as_slice()
     173            2 :         );
     174            2 :         let pos_bar = file.write_blob(b"bar", &ctx).await?;
     175            2 :         assert_eq!(
     176            2 :             b"foo",
     177            2 :             file.block_cursor()
     178            2 :                 .read_blob(pos_foo, &ctx)
     179            2 :                 .await?
     180            2 :                 .as_slice()
     181            2 :         );
     182            2 :         assert_eq!(
     183            2 :             b"bar",
     184            2 :             file.block_cursor()
     185            2 :                 .read_blob(pos_bar, &ctx)
     186            2 :                 .await?
     187            2 :                 .as_slice()
     188            2 :         );
     189            2 : 
     190            2 :         let mut blobs = Vec::new();
     191        20002 :         for i in 0..10000 {
     192        20000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     193        20000 :             let pos = file.write_blob(&data, &ctx).await?;
     194        20000 :             blobs.push((pos, data));
     195            2 :         }
     196            2 :         // also test with a large blobs
     197          202 :         for i in 0..100 {
     198          200 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     199          200 :             let pos = file.write_blob(&data, &ctx).await?;
     200          200 :             blobs.push((pos, data));
     201            2 :         }
     202            2 : 
     203            2 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     204        20202 :         for (pos, expected) in blobs {
     205        20200 :             let actual = cursor.read_blob(pos, &ctx).await?;
     206        20200 :             assert_eq!(actual, expected);
     207            2 :         }
     208            2 : 
     209            2 :         // Test a large blob that spans multiple pages
     210            2 :         let mut large_data = vec![0; 20000];
     211            2 :         thread_rng().fill_bytes(&mut large_data);
     212            2 :         let pos_large = file.write_blob(&large_data, &ctx).await?;
     213            2 :         let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
     214            2 :         assert_eq!(result, large_data);
     215            2 : 
     216            2 :         Ok(())
     217            2 :     }
     218              : }
        

Generated by: LCOV version 2.1-beta