LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 96.0 % 351 337
Test Date: 2024-09-19 20:36:02 Functions: 92.3 % 26 24

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
       5              : use crate::config::PageServerConf;
       6              : use crate::context::RequestContext;
       7              : use crate::page_cache;
       8              : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
       9              : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
      10              : use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
      11              : use crate::virtual_file::owned_buffers_io::write::Buffer;
      12              : use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
      13              : use bytes::BytesMut;
      14              : use camino::Utf8PathBuf;
      15              : use num_traits::Num;
      16              : use pageserver_api::shard::TenantShardId;
      17              : use tokio_epoll_uring::{BoundedBuf, Slice};
      18              : use tracing::error;
      19              : 
      20              : use std::io;
      21              : use std::sync::atomic::AtomicU64;
      22              : use utils::id::TimelineId;
      23              : 
      24              : pub struct EphemeralFile {
      25              :     _tenant_shard_id: TenantShardId,
      26              :     _timeline_id: TimelineId,
      27              :     page_cache_file_id: page_cache::FileId,
      28              :     bytes_written: u64,
      29              :     buffered_writer: owned_buffers_io::write::BufferedWriter<
      30              :         BytesMut,
      31              :         size_tracking_writer::Writer<VirtualFile>,
      32              :     >,
      33              :     /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
      34              :     _gate_guard: utils::sync::gate::GateGuard,
      35              : }
      36              : 
      37              : const TAIL_SZ: usize = 64 * 1024;
      38              : 
      39              : impl EphemeralFile {
      40         3840 :     pub async fn create(
      41         3840 :         conf: &PageServerConf,
      42         3840 :         tenant_shard_id: TenantShardId,
      43         3840 :         timeline_id: TimelineId,
      44         3840 :         gate_guard: utils::sync::gate::GateGuard,
      45         3840 :         ctx: &RequestContext,
      46         3840 :     ) -> Result<EphemeralFile, io::Error> {
      47              :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      48         3840 :         let filename_disambiguator =
      49         3840 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      50         3840 : 
      51         3840 :         let filename = conf
      52         3840 :             .timeline_path(&tenant_shard_id, &timeline_id)
      53         3840 :             .join(Utf8PathBuf::from(format!(
      54         3840 :                 "ephemeral-{filename_disambiguator}"
      55         3840 :             )));
      56              : 
      57         3840 :         let file = VirtualFile::open_with_options(
      58         3840 :             &filename,
      59         3840 :             virtual_file::OpenOptions::new()
      60         3840 :                 .read(true)
      61         3840 :                 .write(true)
      62         3840 :                 .create(true),
      63         3840 :             ctx,
      64         3840 :         )
      65         2184 :         .await?;
      66              : 
      67         3840 :         let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
      68         3840 : 
      69         3840 :         Ok(EphemeralFile {
      70         3840 :             _tenant_shard_id: tenant_shard_id,
      71         3840 :             _timeline_id: timeline_id,
      72         3840 :             page_cache_file_id,
      73         3840 :             bytes_written: 0,
      74         3840 :             buffered_writer: owned_buffers_io::write::BufferedWriter::new(
      75         3840 :                 size_tracking_writer::Writer::new(file),
      76         3840 :                 BytesMut::with_capacity(TAIL_SZ),
      77         3840 :             ),
      78         3840 :             _gate_guard: gate_guard,
      79         3840 :         })
      80         3840 :     }
      81              : }
      82              : 
      83              : impl Drop for EphemeralFile {
      84         3456 :     fn drop(&mut self) {
      85         3456 :         // unlink the file
      86         3456 :         // we are clear to do this, because we have entered a gate
      87         3456 :         let path = &self.buffered_writer.as_inner().as_inner().path;
      88         3456 :         let res = std::fs::remove_file(path);
      89         3456 :         if let Err(e) = res {
      90            6 :             if e.kind() != std::io::ErrorKind::NotFound {
      91              :                 // just never log the not found errors, we cannot do anything for them; on detach
      92              :                 // the tenant directory is already gone.
      93              :                 //
      94              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
      95            0 :                 error!("could not remove ephemeral file '{path}': {e}");
      96            6 :             }
      97         3450 :         }
      98         3456 :     }
      99              : }
     100              : 
     101              : impl EphemeralFile {
     102     28832028 :     pub(crate) fn len(&self) -> u64 {
     103     28832028 :         self.bytes_written
     104     28832028 :     }
     105              : 
     106         3816 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
     107         3816 :         self.page_cache_file_id
     108         3816 :     }
     109              : 
     110         2910 :     pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
     111         2910 :         let size = self.len().into_usize();
     112         2910 :         let vec = Vec::with_capacity(size);
     113         2910 :         let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
     114         2910 :         assert_eq!(nread, size);
     115         2910 :         let vec = slice.into_inner();
     116         2910 :         assert_eq!(vec.len(), nread);
     117         2910 :         assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
     118         2910 :         Ok(vec)
     119         2910 :     }
     120              : 
     121              :     /// Returns the offset at which the first byte of the input was written, for use
     122              :     /// in constructing indices over the written value.
     123              :     ///
     124              :     /// Panics if the write is short because there's no way we can recover from that.
     125              :     /// TODO: make upstack handle this as an error.
     126     15002484 :     pub(crate) async fn write_raw(
     127     15002484 :         &mut self,
     128     15002484 :         srcbuf: &[u8],
     129     15002484 :         ctx: &RequestContext,
     130     15002484 :     ) -> std::io::Result<u64> {
     131     15002484 :         let pos = self.bytes_written;
     132              : 
     133     15002484 :         let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
     134            0 :             std::io::Error::new(
     135            0 :                 std::io::ErrorKind::Other,
     136            0 :                 format!(
     137            0 :                     "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
     138            0 :                     srcbuf_len = srcbuf.len(),
     139            0 :                 ),
     140            0 :             )
     141     15002484 :         })?;
     142              : 
     143              :         // Write the payload
     144     15002484 :         let nwritten = self
     145     15002484 :             .buffered_writer
     146     15002484 :             .write_buffered_borrowed(srcbuf, ctx)
     147        10542 :             .await?;
     148     15002484 :         assert_eq!(
     149     15002484 :             nwritten,
     150     15002484 :             srcbuf.len(),
     151            0 :             "buffered writer has no short writes"
     152              :         );
     153              : 
     154     15002484 :         self.bytes_written = new_bytes_written;
     155     15002484 : 
     156     15002484 :         Ok(pos)
     157     15002484 :     }
     158              : }
     159              : 
     160              : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
     161      2083239 :     async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
     162      2083239 :         &'b self,
     163      2083239 :         start: u64,
     164      2083239 :         dst: tokio_epoll_uring::Slice<B>,
     165      2083239 :         ctx: &'a RequestContext,
     166      2083239 :     ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
     167      2083239 :         let file_size_tracking_writer = self.buffered_writer.as_inner();
     168      2083239 :         let flushed_offset = file_size_tracking_writer.bytes_written();
     169      2083239 : 
     170      2083239 :         let buffer = self.buffered_writer.inspect_buffer();
     171      2083239 :         let buffered = &buffer[0..buffer.pending()];
     172      2083239 : 
     173      2083239 :         let dst_cap = dst.bytes_total().into_u64();
     174      2083239 :         let end = {
     175              :             // saturating_add is correct here because the max file size is u64::MAX, so,
     176              :             // if start + dst.len() > u64::MAX, then we know it will be a short read
     177      2083239 :             let mut end: u64 = start.saturating_add(dst_cap);
     178      2083239 :             if end > self.bytes_written {
     179       830550 :                 end = self.bytes_written;
     180      1252689 :             }
     181      2083239 :             end
     182              :         };
     183              : 
     184              :         // inclusive, exclusive
     185              :         #[derive(Debug)]
     186              :         struct Range<N>(N, N);
     187              :         impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
     188      8621409 :             fn len(&self) -> N {
     189      8621409 :                 if self.0 > self.1 {
     190      3215919 :                     N::zero()
     191              :                 } else {
     192      5405490 :                     self.1 - self.0
     193              :                 }
     194      8621409 :             }
     195              :         }
     196      2083239 :         let written_range = Range(start, std::cmp::min(end, flushed_offset));
     197      2083239 :         let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
     198              : 
     199      2083239 :         let dst = if written_range.len() > 0 {
     200       902331 :             let file: &VirtualFile = file_size_tracking_writer.as_inner();
     201       902331 :             let bounds = dst.bounds();
     202       902331 :             let slice = file
     203       902331 :                 .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
     204       458066 :                 .await?;
     205       902331 :             Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
     206              :         } else {
     207      1180908 :             dst
     208              :         };
     209              : 
     210      2083239 :         let dst = if buffered_range.len() > 0 {
     211      1184200 :             let offset_in_buffer = buffered_range
     212      1184200 :                 .0
     213      1184200 :                 .checked_sub(flushed_offset)
     214      1184200 :                 .unwrap()
     215      1184200 :                 .into_usize();
     216      1184200 :             let to_copy =
     217      1184200 :                 &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
     218      1184200 :             let bounds = dst.bounds();
     219      1184200 :             let mut view = dst.slice({
     220      1184200 :                 let start = written_range.len().into_usize();
     221      1184200 :                 let end = start
     222      1184200 :                     .checked_add(buffered_range.len().into_usize())
     223      1184200 :                     .unwrap();
     224      1184200 :                 start..end
     225      1184200 :             });
     226      1184200 :             view.as_mut_rust_slice_full_zeroed()
     227      1184200 :                 .copy_from_slice(to_copy);
     228      1184200 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     229              :         } else {
     230       899039 :             dst
     231              :         };
     232              : 
     233              :         // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
     234              : 
     235      2083239 :         Ok((dst, (end - start).into_usize()))
     236      2083239 :     }
     237              : }
     238              : 
     239              : /// Does the given filename look like an ephemeral file?
     240            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     241            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     242            0 :         rest.parse::<u32>().is_ok()
     243              :     } else {
     244            0 :         false
     245              :     }
     246            0 : }
     247              : 
     248              : #[cfg(test)]
     249              : mod tests {
     250              :     use rand::Rng;
     251              : 
     252              :     use super::*;
     253              :     use crate::context::DownloadBehavior;
     254              :     use crate::task_mgr::TaskKind;
     255              :     use std::fs;
     256              :     use std::str::FromStr;
     257              : 
     258           24 :     fn harness(
     259           24 :         test_name: &str,
     260           24 :     ) -> Result<
     261           24 :         (
     262           24 :             &'static PageServerConf,
     263           24 :             TenantShardId,
     264           24 :             TimelineId,
     265           24 :             RequestContext,
     266           24 :         ),
     267           24 :         io::Error,
     268           24 :     > {
     269           24 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     270           24 :         let _ = fs::remove_dir_all(&repo_dir);
     271           24 :         let conf = PageServerConf::dummy_conf(repo_dir);
     272           24 :         // Make a static copy of the config. This can never be free'd, but that's
     273           24 :         // OK in a test.
     274           24 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     275           24 : 
     276           24 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     277           24 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     278           24 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     279              : 
     280           24 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     281           24 : 
     282           24 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     283           24 :     }
     284              : 
     285              :     #[tokio::test]
     286            6 :     async fn ephemeral_file_holds_gate_open() {
     287            6 :         const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
     288            6 : 
     289            6 :         let (conf, tenant_id, timeline_id, ctx) =
     290            6 :             harness("ephemeral_file_holds_gate_open").unwrap();
     291            6 : 
     292            6 :         let gate = utils::sync::gate::Gate::default();
     293            6 : 
     294            6 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     295            6 :             .await
     296            6 :             .unwrap();
     297            6 : 
     298            6 :         let mut closing = tokio::task::spawn(async move {
     299           12 :             gate.close().await;
     300            6 :         });
     301            6 : 
     302            6 :         // gate is entered until the ephemeral file is dropped
     303            6 :         // do not start paused tokio-epoll-uring has a sleep loop
     304            6 :         tokio::time::pause();
     305            6 :         tokio::time::timeout(FOREVER, &mut closing)
     306            6 :             .await
     307            6 :             .expect_err("closing cannot complete before dropping");
     308            6 : 
     309            6 :         // this is a requirement of the reset_tenant functionality: we have to be able to restart a
     310            6 :         // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
     311            6 :         drop(file);
     312            6 : 
     313            6 :         tokio::time::timeout(FOREVER, &mut closing)
     314            6 :             .await
     315            6 :             .expect("closing completes right away")
     316            6 :             .expect("closing does not panic");
     317            6 :     }
     318              : 
     319              :     #[tokio::test]
     320            6 :     async fn test_ephemeral_file_basics() {
     321            6 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
     322            6 : 
     323            6 :         let gate = utils::sync::gate::Gate::default();
     324            6 : 
     325            6 :         let mut file =
     326            6 :             EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     327            6 :                 .await
     328            6 :                 .unwrap();
     329            6 : 
     330            6 :         let cap = file.buffered_writer.inspect_buffer().capacity();
     331            6 : 
     332            6 :         let write_nbytes = cap + cap / 2;
     333            6 : 
     334            6 :         let content: Vec<u8> = rand::thread_rng()
     335            6 :             .sample_iter(rand::distributions::Standard)
     336            6 :             .take(write_nbytes)
     337            6 :             .collect();
     338            6 : 
     339            6 :         let mut value_offsets = Vec::new();
     340       589824 :         for i in 0..write_nbytes {
     341       589824 :             let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
     342       589824 :             value_offsets.push(off);
     343            6 :         }
     344            6 : 
     345            6 :         assert!(file.len() as usize == write_nbytes);
     346       589824 :         for i in 0..write_nbytes {
     347       589824 :             assert_eq!(value_offsets[i], i.into_u64());
     348       589824 :             let buf = Vec::with_capacity(1);
     349       589824 :             let (buf_slice, nread) = file
     350       589824 :                 .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
     351       199680 :                 .await
     352       589824 :                 .unwrap();
     353       589824 :             let buf = buf_slice.into_inner();
     354       589824 :             assert_eq!(nread, 1);
     355       589824 :             assert_eq!(&buf, &content[i..i + 1]);
     356            6 :         }
     357            6 : 
     358            6 :         let file_contents =
     359            6 :             std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
     360            6 :         assert_eq!(file_contents, &content[0..cap]);
     361            6 : 
     362            6 :         let buffer_contents = file.buffered_writer.inspect_buffer();
     363            6 :         assert_eq!(buffer_contents, &content[cap..write_nbytes]);
     364            6 :     }
     365              : 
     366              :     #[tokio::test]
     367            6 :     async fn test_flushes_do_happen() {
     368            6 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
     369            6 : 
     370            6 :         let gate = utils::sync::gate::Gate::default();
     371            6 : 
     372            6 :         let mut file =
     373            6 :             EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     374            6 :                 .await
     375            6 :                 .unwrap();
     376            6 : 
     377            6 :         let cap = file.buffered_writer.inspect_buffer().capacity();
     378            6 : 
     379            6 :         let content: Vec<u8> = rand::thread_rng()
     380            6 :             .sample_iter(rand::distributions::Standard)
     381            6 :             .take(cap + cap / 2)
     382            6 :             .collect();
     383            6 : 
     384            6 :         file.write_raw(&content, &ctx).await.unwrap();
     385            6 : 
     386            6 :         // assert the state is as this test expects it to be
     387            6 :         assert_eq!(
     388            6 :             &file.load_to_vec(&ctx).await.unwrap(),
     389            6 :             &content[0..cap + cap / 2]
     390            6 :         );
     391            6 :         let md = file
     392            6 :             .buffered_writer
     393            6 :             .as_inner()
     394            6 :             .as_inner()
     395            6 :             .path
     396            6 :             .metadata()
     397            6 :             .unwrap();
     398            6 :         assert_eq!(
     399            6 :             md.len(),
     400            6 :             cap.into_u64(),
     401            6 :             "buffered writer does one write if we write 1.5x buffer capacity"
     402            6 :         );
     403            6 :         assert_eq!(
     404            6 :             &file.buffered_writer.inspect_buffer()[0..cap / 2],
     405            6 :             &content[cap..cap + cap / 2]
     406            6 :         );
     407            6 :     }
     408              : 
     409              :     #[tokio::test]
     410            6 :     async fn test_read_split_across_file_and_buffer() {
     411            6 :         // This test exercises the logic on the read path that splits the logical read
     412            6 :         // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
     413            6 :         //
     414            6 :         // This test build on the assertions in test_flushes_do_happen
     415            6 : 
     416            6 :         let (conf, tenant_id, timeline_id, ctx) =
     417            6 :             harness("test_read_split_across_file_and_buffer").unwrap();
     418            6 : 
     419            6 :         let gate = utils::sync::gate::Gate::default();
     420            6 : 
     421            6 :         let mut file =
     422            6 :             EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     423            6 :                 .await
     424            6 :                 .unwrap();
     425            6 : 
     426            6 :         let cap = file.buffered_writer.inspect_buffer().capacity();
     427            6 : 
     428            6 :         let content: Vec<u8> = rand::thread_rng()
     429            6 :             .sample_iter(rand::distributions::Standard)
     430            6 :             .take(cap + cap / 2)
     431            6 :             .collect();
     432            6 : 
     433            6 :         file.write_raw(&content, &ctx).await.unwrap();
     434            6 : 
     435           30 :         let test_read = |start: usize, len: usize| {
     436           30 :             let file = &file;
     437           30 :             let ctx = &ctx;
     438           30 :             let content = &content;
     439           30 :             async move {
     440           30 :                 let (buf, nread) = file
     441           30 :                     .read_exact_at_eof_ok(
     442           30 :                         start.into_u64(),
     443           30 :                         Vec::with_capacity(len).slice_full(),
     444           30 :                         ctx,
     445           30 :                     )
     446            9 :                     .await
     447           30 :                     .unwrap();
     448           30 :                 assert_eq!(nread, len);
     449           30 :                 assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
     450           30 :             }
     451           30 :         };
     452            6 : 
     453            6 :         // completely within the file range
     454            6 :         assert!(20 < cap, "test assumption");
     455            6 :         test_read(10, 10).await;
     456            6 :         // border onto edge of file
     457            6 :         test_read(cap - 10, 10).await;
     458            6 :         // read across file and buffer
     459            6 :         test_read(cap - 10, 20).await;
     460            6 :         // stay from start of buffer
     461            6 :         test_read(cap, 10).await;
     462            6 :         // completely within buffer
     463            6 :         test_read(cap + 10, 10).await;
     464            6 :     }
     465              : }
        

Generated by: LCOV version 2.1-beta