LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 97.4 % 191 186
Test Date: 2024-08-21 17:32:46 Functions: 94.4 % 18 17

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::config::PageServerConf;
       5              : use crate::context::RequestContext;
       6              : use crate::page_cache;
       7              : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
       8              : use crate::virtual_file::{self, VirtualFile};
       9              : use camino::Utf8PathBuf;
      10              : use pageserver_api::shard::TenantShardId;
      11              : 
      12              : use std::io;
      13              : use std::sync::atomic::AtomicU64;
      14              : use utils::id::TimelineId;
      15              : 
      16              : pub struct EphemeralFile {
      17              :     _tenant_shard_id: TenantShardId,
      18              :     _timeline_id: TimelineId,
      19              : 
      20              :     rw: page_caching::RW,
      21              : }
      22              : 
      23              : mod page_caching;
      24              : mod zero_padded_read_write;
      25              : 
      26              : impl EphemeralFile {
      27         1268 :     pub async fn create(
      28         1268 :         conf: &PageServerConf,
      29         1268 :         tenant_shard_id: TenantShardId,
      30         1268 :         timeline_id: TimelineId,
      31         1268 :         gate_guard: utils::sync::gate::GateGuard,
      32         1268 :         ctx: &RequestContext,
      33         1268 :     ) -> Result<EphemeralFile, io::Error> {
      34         1268 :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      35         1268 :         let filename_disambiguator =
      36         1268 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      37         1268 : 
      38         1268 :         let filename = conf
      39         1268 :             .timeline_path(&tenant_shard_id, &timeline_id)
      40         1268 :             .join(Utf8PathBuf::from(format!(
      41         1268 :                 "ephemeral-{filename_disambiguator}"
      42         1268 :             )));
      43              : 
      44         1268 :         let file = VirtualFile::open_with_options(
      45         1268 :             &filename,
      46         1268 :             virtual_file::OpenOptions::new()
      47         1268 :                 .read(true)
      48         1268 :                 .write(true)
      49         1268 :                 .create(true),
      50         1268 :             ctx,
      51         1268 :         )
      52          716 :         .await?;
      53              : 
      54         1268 :         Ok(EphemeralFile {
      55         1268 :             _tenant_shard_id: tenant_shard_id,
      56         1268 :             _timeline_id: timeline_id,
      57         1268 :             rw: page_caching::RW::new(file, gate_guard),
      58         1268 :         })
      59         1268 :     }
      60              : 
      61      5092842 :     pub(crate) fn len(&self) -> u64 {
      62      5092842 :         self.rw.bytes_written()
      63      5092842 :     }
      64              : 
      65         1264 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
      66         1264 :         self.rw.page_cache_file_id()
      67         1264 :     }
      68              : 
      69              :     /// See [`self::page_caching::RW::load_to_vec`].
      70          968 :     pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
      71          968 :         self.rw.load_to_vec(ctx).await
      72          968 :     }
      73              : 
      74       524745 :     pub(crate) async fn read_blk(
      75       524745 :         &self,
      76       524745 :         blknum: u32,
      77       524745 :         ctx: &RequestContext,
      78       524745 :     ) -> Result<BlockLease, io::Error> {
      79       524745 :         self.rw.read_blk(blknum, ctx).await
      80       524745 :     }
      81              : 
      82      5110816 :     pub(crate) async fn write_blob(
      83      5110816 :         &mut self,
      84      5110816 :         srcbuf: &[u8],
      85      5110816 :         ctx: &RequestContext,
      86      5110816 :     ) -> Result<u64, io::Error> {
      87      5110816 :         let pos = self.rw.bytes_written();
      88      5110816 : 
      89      5110816 :         // Write the length field
      90      5110816 :         if srcbuf.len() < 0x80 {
      91              :             // short one-byte length header
      92      4956998 :             let len_buf = [srcbuf.len() as u8];
      93      4956998 : 
      94      4956998 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      95              :         } else {
      96       153818 :             let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
      97       153818 :             len_buf[0] |= 0x80;
      98       153818 :             self.rw.write_all_borrowed(&len_buf, ctx).await?;
      99              :         }
     100              : 
     101              :         // Write the payload
     102      5110816 :         self.rw.write_all_borrowed(srcbuf, ctx).await?;
     103              : 
     104      5110816 :         Ok(pos)
     105      5110816 :     }
     106              : }
     107              : 
     108              : /// Does the given filename look like an ephemeral file?
     109            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     110            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     111            0 :         rest.parse::<u32>().is_ok()
     112              :     } else {
     113            0 :         false
     114              :     }
     115            0 : }
     116              : 
     117              : impl BlockReader for EphemeralFile {
     118       606151 :     fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
     119       606151 :         BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
     120       606151 :     }
     121              : }
     122              : 
     123              : #[cfg(test)]
     124              : mod tests {
     125              :     use super::*;
     126              :     use crate::context::DownloadBehavior;
     127              :     use crate::task_mgr::TaskKind;
     128              :     use crate::tenant::block_io::BlockReaderRef;
     129              :     use rand::{thread_rng, RngCore};
     130              :     use std::fs;
     131              :     use std::str::FromStr;
     132              : 
     133            4 :     fn harness(
     134            4 :         test_name: &str,
     135            4 :     ) -> Result<
     136            4 :         (
     137            4 :             &'static PageServerConf,
     138            4 :             TenantShardId,
     139            4 :             TimelineId,
     140            4 :             RequestContext,
     141            4 :         ),
     142            4 :         io::Error,
     143            4 :     > {
     144            4 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     145            4 :         let _ = fs::remove_dir_all(&repo_dir);
     146            4 :         let conf = PageServerConf::dummy_conf(repo_dir);
     147            4 :         // Make a static copy of the config. This can never be free'd, but that's
     148            4 :         // OK in a test.
     149            4 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     150            4 : 
     151            4 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     152            4 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     153            4 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     154              : 
     155            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     156            4 : 
     157            4 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     158            4 :     }
     159              : 
     160              :     #[tokio::test]
     161            2 :     async fn test_ephemeral_blobs() -> Result<(), io::Error> {
     162            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
     163            2 : 
     164            2 :         let gate = utils::sync::gate::Gate::default();
     165            2 : 
     166            2 :         let entered = gate.enter().unwrap();
     167            2 : 
     168            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;
     169            2 : 
     170            2 :         let pos_foo = file.write_blob(b"foo", &ctx).await?;
     171            2 :         assert_eq!(
     172            2 :             b"foo",
     173            2 :             file.block_cursor()
     174            2 :                 .read_blob(pos_foo, &ctx)
     175            2 :                 .await?
     176            2 :                 .as_slice()
     177            2 :         );
     178            2 :         let pos_bar = file.write_blob(b"bar", &ctx).await?;
     179            2 :         assert_eq!(
     180            2 :             b"foo",
     181            2 :             file.block_cursor()
     182            2 :                 .read_blob(pos_foo, &ctx)
     183            2 :                 .await?
     184            2 :                 .as_slice()
     185            2 :         );
     186            2 :         assert_eq!(
     187            2 :             b"bar",
     188            2 :             file.block_cursor()
     189            2 :                 .read_blob(pos_bar, &ctx)
     190            2 :                 .await?
     191            2 :                 .as_slice()
     192            2 :         );
     193            2 : 
     194            2 :         let mut blobs = Vec::new();
     195        20002 :         for i in 0..10000 {
     196        20000 :             let data = Vec::from(format!("blob{}", i).as_bytes());
     197        20000 :             let pos = file.write_blob(&data, &ctx).await?;
     198        20000 :             blobs.push((pos, data));
     199            2 :         }
     200            2 :         // also test with a large blobs
     201          202 :         for i in 0..100 {
     202          200 :             let data = format!("blob{}", i).as_bytes().repeat(100);
     203          200 :             let pos = file.write_blob(&data, &ctx).await?;
     204          200 :             blobs.push((pos, data));
     205            2 :         }
     206            2 : 
     207            2 :         let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
     208        20202 :         for (pos, expected) in blobs {
     209        20200 :             let actual = cursor.read_blob(pos, &ctx).await?;
     210        20200 :             assert_eq!(actual, expected);
     211            2 :         }
     212            2 : 
     213            2 :         // Test a large blob that spans multiple pages
     214            2 :         let mut large_data = vec![0; 20000];
     215            2 :         thread_rng().fill_bytes(&mut large_data);
     216            2 :         let pos_large = file.write_blob(&large_data, &ctx).await?;
     217            2 :         let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
     218            2 :         assert_eq!(result, large_data);
     219            2 : 
     220            2 :         Ok(())
     221            2 :     }
     222              : 
     223              :     #[tokio::test]
     224            2 :     async fn ephemeral_file_holds_gate_open() {
     225            2 :         const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
     226            2 : 
     227            2 :         let (conf, tenant_id, timeline_id, ctx) =
     228            2 :             harness("ephemeral_file_holds_gate_open").unwrap();
     229            2 : 
     230            2 :         let gate = utils::sync::gate::Gate::default();
     231            2 : 
     232            2 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     233            2 :             .await
     234            2 :             .unwrap();
     235            2 : 
     236            2 :         let mut closing = tokio::task::spawn(async move {
     237            4 :             gate.close().await;
     238            2 :         });
     239            2 : 
     240            2 :         // gate is entered until the ephemeral file is dropped
     241            2 :         // do not start paused tokio-epoll-uring has a sleep loop
     242            2 :         tokio::time::pause();
     243            2 :         tokio::time::timeout(FOREVER, &mut closing)
     244            2 :             .await
     245            2 :             .expect_err("closing cannot complete before dropping");
     246            2 : 
     247            2 :         // this is a requirement of the reset_tenant functionality: we have to be able to restart a
     248            2 :         // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
     249            2 :         drop(file);
     250            2 : 
     251            2 :         tokio::time::timeout(FOREVER, &mut closing)
     252            2 :             .await
     253            2 :             .expect("closing completes right away")
     254            2 :             .expect("closing does not panic");
     255            2 :     }
     256              : }
        

Generated by: LCOV version 2.1-beta