LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 96.6 % 149 144
Test Date: 2024-05-10 13:18:37 Functions: 92.3 % 13 12

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::config::PageServerConf;
       5              : use crate::context::RequestContext;
       6              : use crate::page_cache;
       7              : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       8              : use crate::virtual_file::{self, VirtualFile};
       9              : use camino::Utf8PathBuf;
      10              : use pageserver_api::shard::TenantShardId;
      11              : 
      12              : use std::io;
      13              : use std::sync::atomic::AtomicU64;
      14              : use utils::id::TimelineId;
      15              : 
      16              : pub struct EphemeralFile {
      17              :     _tenant_shard_id: TenantShardId,
      18              :     _timeline_id: TimelineId,
      19              : 
      20              :     rw: page_caching::RW,
      21              : }
      22              : 
      23              : mod page_caching;
      24              : mod zero_padded_read_write;
      25              : 
      26              : impl EphemeralFile {
      27         1052 :     pub async fn create(
      28         1052 :         conf: &PageServerConf,
      29         1052 :         tenant_shard_id: TenantShardId,
      30         1052 :         timeline_id: TimelineId,
      31         1052 :     ) -> Result<EphemeralFile, io::Error> {
      32         1052 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      33         1052 :         let filename_disambiguator =
      34         1052 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      35         1052 : 
      36         1052 :         let filename = conf
      37         1052 :             .timeline_path(&tenant_shard_id, &timeline_id)
      38         1052 :             .join(Utf8PathBuf::from(format!(
      39         1052 :                 "ephemeral-{filename_disambiguator}"
      40         1052 :             )));
      41              : 
      42         1052 :         let file = VirtualFile::open_with_options(
      43         1052 :             &filename,
      44         1052 :             virtual_file::OpenOptions::new()
      45         1052 :                 .read(true)
      46         1052 :                 .write(true)
      47         1052 :                 .create(true),
      48         1052 :         )
      49          579 :         .await?;
      50              : 
      51         1052 :         Ok(EphemeralFile {
      52         1052 :             _tenant_shard_id: tenant_shard_id,
      53         1052 :             _timeline_id: timeline_id,
      54         1052 :             rw: page_caching::RW::new(file),
      55         1052 :         })
      56         1052 :     }
      57              : 
      58      9846066 :     pub(crate) fn len(&self) -> u64 {
      59      9846066 :         self.rw.bytes_written()
      60      9846066 :     }
      61              : 
      62         1050 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
      63         1050 :         self.rw.page_cache_file_id()
      64         1050 :     }
      65              : 
      66      4930954 :     pub(crate) async fn read_blk(
      67      4930954 :         &self,
      68      4930954 :         blknum: u32,
      69      4930954 :         ctx: &RequestContext,
      70      4930954 :     ) -> Result<BlockLease, io::Error> {
      71      4930954 :         self.rw.read_blk(blknum, ctx).await
      72      4930954 :     }
      73              : 
      74      5086244 :     pub(crate) async fn write_blob(
      75      5086244 :         &mut self,
      76      5086244 :         srcbuf: &[u8],
      77      5086244 :         ctx: &RequestContext,
      78      5086244 :     ) -> Result<u64, io::Error> {
      79      5086244 :         let pos = self.rw.bytes_written();
      80      5086244 : 
      81      5086244 :         // Write the length field
      82      5086244 :         if srcbuf.len() < 0x80 {
      83              :             // short one-byte length header
      84      4932426 :             let len_buf = [srcbuf.len() as u8];
      85      4932426 : 
      86      4932426 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      87              :         } else {
      88       153818 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
      89       153818 :             len_buf[0] |= 0x80;
      90       153818 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      91              :         }
      92              : 
      93              :         // Write the payload
      94      5086244 :         self.rw.write_all_borrowed(srcbuf, ctx).await?;
      95              : 
      96      5086244 :         Ok(pos)
      97      5086244 :     }
      98              : }
      99              : 
     100              : /// Does the given filename look like an ephemeral file?
     101            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     102            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     103            0 :         rest.parse::<u32>().is_ok()
     104              :     } else {
     105            0 :         false
     106              :     }
     107            0 : }
     108              : 
     109              : impl BlockReader for EphemeralFile {
     110       606419 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     111       606419 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     112       606419 :     }
     113              : }
     114              : 
     115              : #[cfg(test)]
     116              : mod tests {
     117              :     use super::*;
     118              :     use crate::context::DownloadBehavior;
     119              :     use crate::task_mgr::TaskKind;
     120              :     use crate::tenant::block_io::BlockReaderRef;
     121              :     use rand::{thread_rng, RngCore};
     122              :     use std::fs;
     123              :     use std::str::FromStr;
     124              : 
     125            2 :     fn harness(
     126            2 :         test_name: &str,
     127            2 :     ) -> Result<
     128            2 :         (
     129            2 :             &'static PageServerConf,
     130            2 :             TenantShardId,
     131            2 :             TimelineId,
     132            2 :             RequestContext,
     133            2 :         ),
     134            2 :         io::Error,
     135            2 :     > {
     136            2 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     137            2 :         let _ = fs::remove_dir_all(&repo_dir);
     138            2 :         let conf = PageServerConf::dummy_conf(repo_dir);
     139            2 :         // Make a static copy of the config. This can never be free'd, but that's
     140            2 :         // OK in a test.
     141            2 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     142            2 : 
     143            2 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     144            2 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     145            2 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     146              : 
     147            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     148            2 : 
     149            2 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     150            2 :     }
     151              : 
     152              :     #[tokio::test]
     153            2 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     154            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
     155            2 : 
     156            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
     157            2 : 
     158            2 :         let pos_foo = file.write_blob(b"foo", &ctx).await?;
     159            2 :         assert_eq!(
     160            2 :             b"foo",
     161            2 :             file.block_cursor()
     162            2 :                 .read_blob(pos_foo, &ctx)
     163            2 :                 .await?
     164            2 :                 .as_slice()
     165            2 :         );
     166            2 :         let pos_bar = file.write_blob(b"bar", &ctx).await?;
     167            2 :         assert_eq!(
     168            2 :             b"foo",
     169            2 :             file.block_cursor()
     170            2 :                 .read_blob(pos_foo, &ctx)
     171            2 :                 .await?
     172            2 :                 .as_slice()
     173            2 :         );
     174            2 :         assert_eq!(
     175            2 :             b"bar",
     176            2 :             file.block_cursor()
     177            2 :                 .read_blob(pos_bar, &ctx)
     178            2 :                 .await?
     179            2 :                 .as_slice()
     180            2 :         );
     181            2 : 
     182            2 :         let mut blobs = Vec::new();
     183        20002 :         for i in 0..10000 {
     184        20000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     185        20000 :             let pos = file.write_blob(&data, &ctx).await?;
     186        20000 :             blobs.push((pos, data));
     187            2 :         }
     188            2 :         // also test with a large blobs
     189          202 :         for i in 0..100 {
     190          200 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     191          200 :             let pos = file.write_blob(&data, &ctx).await?;
     192          200 :             blobs.push((pos, data));
     193            2 :         }
     194            2 : 
     195            2 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     196        20202 :         for (pos, expected) in blobs {
     197        20200 :             let actual = cursor.read_blob(pos, &ctx).await?;
     198        20200 :             assert_eq!(actual, expected);
     199            2 :         }
     200            2 : 
     201            2 :         // Test a large blob that spans multiple pages
     202            2 :         let mut large_data = vec![0; 20000];
     203            2 :         thread_rng().fill_bytes(&mut large_data);
     204            2 :         let pos_large = file.write_blob(&large_data, &ctx).await?;
     205            2 :         let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
     206            2 :         assert_eq!(result, large_data);
     207            2 : 
     208            2 :         Ok(())
     209            2 :     }
     210              : }
        

Generated by: LCOV version 2.1-beta