LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: fcf55189004bd3119eed75e2873a97da8078700c.info Lines: 96.7 % 151 146
Test Date: 2024-06-25 12:07:31 Functions: 92.3 % 13 12

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::config::PageServerConf;
       5              : use crate::context::RequestContext;
       6              : use crate::page_cache;
       7              : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       8              : use crate::virtual_file::{self, VirtualFile};
       9              : use camino::Utf8PathBuf;
      10              : use pageserver_api::shard::TenantShardId;
      11              : 
      12              : use std::io;
      13              : use std::sync::atomic::AtomicU64;
      14              : use utils::id::TimelineId;
      15              : 
      16              : pub struct EphemeralFile {
      17              :     _tenant_shard_id: TenantShardId,
      18              :     _timeline_id: TimelineId,
      19              : 
      20              :     rw: page_caching::RW,
      21              : }
      22              : 
      23              : mod page_caching;
      24              : mod zero_padded_read_write;
      25              : 
      26              : impl EphemeralFile {
      27         1242 :     pub async fn create(
      28         1242 :         conf: &PageServerConf,
      29         1242 :         tenant_shard_id: TenantShardId,
      30         1242 :         timeline_id: TimelineId,
      31         1242 :         ctx: &RequestContext,
      32         1242 :     ) -> Result<EphemeralFile, io::Error> {
      33         1242 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      34         1242 :         let filename_disambiguator =
      35         1242 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      36         1242 : 
      37         1242 :         let filename = conf
      38         1242 :             .timeline_path(&tenant_shard_id, &timeline_id)
      39         1242 :             .join(Utf8PathBuf::from(format!(
      40         1242 :                 "ephemeral-{filename_disambiguator}"
      41         1242 :             )));
      42              : 
      43         1242 :         let file = VirtualFile::open_with_options(
      44         1242 :             &filename,
      45         1242 :             virtual_file::OpenOptions::new()
      46         1242 :                 .read(true)
      47         1242 :                 .write(true)
      48         1242 :                 .create(true),
      49         1242 :             ctx,
      50         1242 :         )
      51          690 :         .await?;
      52              : 
      53         1242 :         Ok(EphemeralFile {
      54         1242 :             _tenant_shard_id: tenant_shard_id,
      55         1242 :             _timeline_id: timeline_id,
      56         1242 :             rw: page_caching::RW::new(file),
      57         1242 :         })
      58         1242 :     }
      59              : 
      60      5091658 :     pub(crate) fn len(&self) -> u64 {
      61      5091658 :         self.rw.bytes_written()
      62      5091658 :     }
      63              : 
      64         1240 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
      65         1240 :         self.rw.page_cache_file_id()
      66         1240 :     }
      67              : 
      68      4955954 :     pub(crate) async fn read_blk(
      69      4955954 :         &self,
      70      4955954 :         blknum: u32,
      71      4955954 :         ctx: &RequestContext,
      72      4955954 :     ) -> Result<BlockLease, io::Error> {
      73      4955954 :         self.rw.read_blk(blknum, ctx).await
      74      4955954 :     }
      75              : 
      76      5110624 :     pub(crate) async fn write_blob(
      77      5110624 :         &mut self,
      78      5110624 :         srcbuf: &[u8],
      79      5110624 :         ctx: &RequestContext,
      80      5110624 :     ) -> Result<u64, io::Error> {
      81      5110624 :         let pos = self.rw.bytes_written();
      82      5110624 : 
      83      5110624 :         // Write the length field
      84      5110624 :         if srcbuf.len() < 0x80 {
      85              :             // short one-byte length header
      86      4956806 :             let len_buf = [srcbuf.len() as u8];
      87      4956806 : 
      88      4956806 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      89              :         } else {
      90       153818 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
      91       153818 :             len_buf[0] |= 0x80;
      92       153818 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      93              :         }
      94              : 
      95              :         // Write the payload
      96      5110624 :         self.rw.write_all_borrowed(srcbuf, ctx).await?;
      97              : 
      98      5110624 :         Ok(pos)
      99      5110624 :     }
     100              : }
     101              : 
     102              : /// Does the given filename look like an ephemeral file?
     103            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     104            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     105            0 :         rest.parse::<u32>().is_ok()
     106              :     } else {
     107            0 :         false
     108              :     }
     109            0 : }
     110              : 
     111              : impl BlockReader for EphemeralFile {
     112       607157 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     113       607157 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     114       607157 :     }
     115              : }
     116              : 
     117              : #[cfg(test)]
     118              : mod tests {
     119              :     use super::*;
     120              :     use crate::context::DownloadBehavior;
     121              :     use crate::task_mgr::TaskKind;
     122              :     use crate::tenant::block_io::BlockReaderRef;
     123              :     use rand::{thread_rng, RngCore};
     124              :     use std::fs;
     125              :     use std::str::FromStr;
     126              : 
     127            2 :     fn harness(
     128            2 :         test_name: &str,
     129            2 :     ) -> Result<
     130            2 :         (
     131            2 :             &'static PageServerConf,
     132            2 :             TenantShardId,
     133            2 :             TimelineId,
     134            2 :             RequestContext,
     135            2 :         ),
     136            2 :         io::Error,
     137            2 :     > {
     138            2 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     139            2 :         let _ = fs::remove_dir_all(&repo_dir);
     140            2 :         let conf = PageServerConf::dummy_conf(repo_dir);
     141            2 :         // Make a static copy of the config. This can never be free'd, but that's
     142            2 :         // OK in a test.
     143            2 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     144            2 : 
     145            2 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     146            2 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     147            2 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     148              : 
     149            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     150            2 : 
     151            2 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     152            2 :     }
     153              : 
     154              :     #[tokio::test]
     155            2 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     156            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
     157            2 : 
     158            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
     159            2 : 
     160            2 :         let pos_foo = file.write_blob(b"foo", &ctx).await?;
     161            2 :         assert_eq!(
     162            2 :             b"foo",
     163            2 :             file.block_cursor()
     164            2 :                 .read_blob(pos_foo, &ctx)
     165            2 :                 .await?
     166            2 :                 .as_slice()
     167            2 :         );
     168            2 :         let pos_bar = file.write_blob(b"bar", &ctx).await?;
     169            2 :         assert_eq!(
     170            2 :             b"foo",
     171            2 :             file.block_cursor()
     172            2 :                 .read_blob(pos_foo, &ctx)
     173            2 :                 .await?
     174            2 :                 .as_slice()
     175            2 :         );
     176            2 :         assert_eq!(
     177            2 :             b"bar",
     178            2 :             file.block_cursor()
     179            2 :                 .read_blob(pos_bar, &ctx)
     180            2 :                 .await?
     181            2 :                 .as_slice()
     182            2 :         );
     183            2 : 
     184            2 :         let mut blobs = Vec::new();
     185        20002 :         for i in 0..10000 {
     186        20000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     187        20000 :             let pos = file.write_blob(&data, &ctx).await?;
     188        20000 :             blobs.push((pos, data));
     189            2 :         }
     190            2 :         // also test with a large blobs
     191          202 :         for i in 0..100 {
     192          200 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     193          200 :             let pos = file.write_blob(&data, &ctx).await?;
     194          200 :             blobs.push((pos, data));
     195            2 :         }
     196            2 : 
     197            2 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     198        20202 :         for (pos, expected) in blobs {
     199        20200 :             let actual = cursor.read_blob(pos, &ctx).await?;
     200        20200 :             assert_eq!(actual, expected);
     201            2 :         }
     202            2 : 
     203            2 :         // Test a large blob that spans multiple pages
     204            2 :         let mut large_data = vec![0; 20000];
     205            2 :         thread_rng().fill_bytes(&mut large_data);
     206            2 :         let pos_large = file.write_blob(&large_data, &ctx).await?;
     207            2 :         let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
     208            2 :         assert_eq!(result, large_data);
     209            2 : 
     210            2 :         Ok(())
     211            2 :     }
     212              : }
        

Generated by: LCOV version 2.1-beta