LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 96.0 % 354 340
Test Date: 2024-10-22 22:13:45 Functions: 92.3 % 26 24

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
       5              : use crate::config::PageServerConf;
       6              : use crate::context::RequestContext;
       7              : use crate::page_cache;
       8              : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
       9              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
      10              : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
      11              : use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
      12              : use crate::virtual_file::owned_buffers_io::write::Buffer;
      13              : use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
      14              : use bytes::BytesMut;
      15              : use camino::Utf8PathBuf;
      16              : use num_traits::Num;
      17              : use pageserver_api::shard::TenantShardId;
      18              : use tokio_epoll_uring::{BoundedBuf, Slice};
      19              : use tracing::error;
      20              : 
      21              : use std::io;
      22              : use std::sync::atomic::AtomicU64;
      23              : use utils::id::TimelineId;
      24              : 
      25              : pub struct EphemeralFile {
      26              :     _tenant_shard_id: TenantShardId,
      27              :     _timeline_id: TimelineId,
      28              :     page_cache_file_id: page_cache::FileId,
      29              :     bytes_written: u64,
      30              :     buffered_writer: owned_buffers_io::write::BufferedWriter<
      31              :         BytesMut,
      32              :         size_tracking_writer::Writer<VirtualFile>,
      33              :     >,
      34              :     /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
      35              :     _gate_guard: utils::sync::gate::GateGuard,
      36              : }
      37              : 
      38              : const TAIL_SZ: usize = 64 * 1024;
      39              : 
      40              : impl EphemeralFile {
      41         1270 :     pub async fn create(
      42         1270 :         conf: &PageServerConf,
      43         1270 :         tenant_shard_id: TenantShardId,
      44         1270 :         timeline_id: TimelineId,
      45         1270 :         gate_guard: utils::sync::gate::GateGuard,
      46         1270 :         ctx: &RequestContext,
      47         1270 :     ) -> Result<EphemeralFile, io::Error> {
      48              :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      49         1270 :         let filename_disambiguator =
      50         1270 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      51         1270 : 
      52         1270 :         let filename = conf
      53         1270 :             .timeline_path(&tenant_shard_id, &timeline_id)
      54         1270 :             .join(Utf8PathBuf::from(format!(
      55         1270 :                 "ephemeral-{filename_disambiguator}"
      56         1270 :             )));
      57              : 
      58         1270 :         let file = VirtualFile::open_with_options(
      59         1270 :             &filename,
      60         1270 :             virtual_file::OpenOptions::new()
      61         1270 :                 .read(true)
      62         1270 :                 .write(true)
      63         1270 :                 .create(true),
      64         1270 :             ctx,
      65         1270 :         )
      66          721 :         .await?;
      67              : 
      68         1270 :         let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
      69         1270 : 
      70         1270 :         Ok(EphemeralFile {
      71         1270 :             _tenant_shard_id: tenant_shard_id,
      72         1270 :             _timeline_id: timeline_id,
      73         1270 :             page_cache_file_id,
      74         1270 :             bytes_written: 0,
      75         1270 :             buffered_writer: owned_buffers_io::write::BufferedWriter::new(
      76         1270 :                 size_tracking_writer::Writer::new(file),
      77         1270 :                 BytesMut::with_capacity(TAIL_SZ),
      78         1270 :             ),
      79         1270 :             _gate_guard: gate_guard,
      80         1270 :         })
      81         1270 :     }
      82              : }
      83              : 
      84              : impl Drop for EphemeralFile {
      85         1148 :     fn drop(&mut self) {
      86         1148 :         // unlink the file
      87         1148 :         // we are clear to do this, because we have entered a gate
      88         1148 :         let path = self.buffered_writer.as_inner().as_inner().path();
      89         1148 :         let res = std::fs::remove_file(path);
      90         1148 :         if let Err(e) = res {
      91            2 :             if e.kind() != std::io::ErrorKind::NotFound {
      92              :                 // just never log the not found errors, we cannot do anything for them; on detach
      93              :                 // the tenant directory is already gone.
      94              :                 //
      95              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
      96            0 :                 error!("could not remove ephemeral file '{path}': {e}");
      97            2 :             }
      98         1146 :         }
      99         1148 :     }
     100              : }
     101              : 
     102              : impl EphemeralFile {
     103      9610626 :     pub(crate) fn len(&self) -> u64 {
     104      9610626 :         self.bytes_written
     105      9610626 :     }
     106              : 
     107         1262 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
     108         1262 :         self.page_cache_file_id
     109         1262 :     }
     110              : 
     111          970 :     pub(crate) async fn load_to_io_buf(
     112          970 :         &self,
     113          970 :         ctx: &RequestContext,
     114          970 :     ) -> Result<IoBufferMut, io::Error> {
     115          970 :         let size = self.len().into_usize();
     116          970 :         let buf = IoBufferMut::with_capacity(size);
     117          970 :         let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
     118          970 :         assert_eq!(nread, size);
     119          970 :         let buf = slice.into_inner();
     120          970 :         assert_eq!(buf.len(), nread);
     121          970 :         assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
     122          970 :         Ok(buf)
     123          970 :     }
     124              : 
     125              :     /// Returns the offset at which the first byte of the input was written, for use
     126              :     /// in constructing indices over the written value.
     127              :     ///
     128              :     /// Panics if the write is short because there's no way we can recover from that.
     129              :     /// TODO: make upstack handle this as an error.
     130      5000808 :     pub(crate) async fn write_raw(
     131      5000808 :         &mut self,
     132      5000808 :         srcbuf: &[u8],
     133      5000808 :         ctx: &RequestContext,
     134      5000808 :     ) -> std::io::Result<u64> {
     135      5000808 :         let pos = self.bytes_written;
     136              : 
     137      5000808 :         let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
     138            0 :             std::io::Error::new(
     139            0 :                 std::io::ErrorKind::Other,
     140            0 :                 format!(
     141            0 :                     "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
     142            0 :                     srcbuf_len = srcbuf.len(),
     143            0 :                 ),
     144            0 :             )
     145      5000808 :         })?;
     146              : 
     147              :         // Write the payload
     148      5000808 :         let nwritten = self
     149      5000808 :             .buffered_writer
     150      5000808 :             .write_buffered_borrowed(srcbuf, ctx)
     151         3409 :             .await?;
     152      5000808 :         assert_eq!(
     153      5000808 :             nwritten,
     154      5000808 :             srcbuf.len(),
     155            0 :             "buffered writer has no short writes"
     156              :         );
     157              : 
     158      5000808 :         self.bytes_written = new_bytes_written;
     159      5000808 : 
     160      5000808 :         Ok(pos)
     161      5000808 :     }
     162              : }
     163              : 
     164              : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
     165       694329 :     async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
     166       694329 :         &'b self,
     167       694329 :         start: u64,
     168       694329 :         dst: tokio_epoll_uring::Slice<B>,
     169       694329 :         ctx: &'a RequestContext,
     170       694329 :     ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
     171       694329 :         let file_size_tracking_writer = self.buffered_writer.as_inner();
     172       694329 :         let flushed_offset = file_size_tracking_writer.bytes_written();
     173       694329 : 
     174       694329 :         let buffer = self.buffered_writer.inspect_buffer();
     175       694329 :         let buffered = &buffer[0..buffer.pending()];
     176       694329 : 
     177       694329 :         let dst_cap = dst.bytes_total().into_u64();
     178       694329 :         let end = {
     179              :             // saturating_add is correct here because the max file size is u64::MAX, so,
     180              :             // if start + dst.len() > u64::MAX, then we know it will be a short read
     181       694329 :             let mut end: u64 = start.saturating_add(dst_cap);
     182       694329 :             if end > self.bytes_written {
     183       276784 :                 end = self.bytes_written;
     184       417545 :             }
     185       694329 :             end
     186              :         };
     187              : 
     188              :         // inclusive, exclusive
     189              :         #[derive(Debug)]
     190              :         struct Range<N>(N, N);
     191              :         impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
     192      2873427 :             fn len(&self) -> N {
     193      2873427 :                 if self.0 > self.1 {
     194      1071919 :                     N::zero()
     195              :                 } else {
     196      1801508 :                     self.1 - self.0
     197              :                 }
     198      2873427 :             }
     199              :         }
     200       694329 :         let written_range = Range(start, std::cmp::min(end, flushed_offset));
     201       694329 :         let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
     202              : 
     203       694329 :         let dst = if written_range.len() > 0 {
     204       300762 :             let file: &VirtualFile = file_size_tracking_writer.as_inner();
     205       300762 :             let bounds = dst.bounds();
     206       300762 :             let slice = file
     207       300762 :                 .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
     208       152640 :                 .await?;
     209       300762 :             Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
     210              :         } else {
     211       393567 :             dst
     212              :         };
     213              : 
     214       694329 :         let dst = if buffered_range.len() > 0 {
     215       394669 :             let offset_in_buffer = buffered_range
     216       394669 :                 .0
     217       394669 :                 .checked_sub(flushed_offset)
     218       394669 :                 .unwrap()
     219       394669 :                 .into_usize();
     220       394669 :             let to_copy =
     221       394669 :                 &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
     222       394669 :             let bounds = dst.bounds();
     223       394669 :             let mut view = dst.slice({
     224       394669 :                 let start = written_range.len().into_usize();
     225       394669 :                 let end = start
     226       394669 :                     .checked_add(buffered_range.len().into_usize())
     227       394669 :                     .unwrap();
     228       394669 :                 start..end
     229       394669 :             });
     230       394669 :             view.as_mut_rust_slice_full_zeroed()
     231       394669 :                 .copy_from_slice(to_copy);
     232       394669 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     233              :         } else {
     234       299660 :             dst
     235              :         };
     236              : 
     237              :         // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
     238              : 
     239       694329 :         Ok((dst, (end - start).into_usize()))
     240       694329 :     }
     241              : }
     242              : 
     243              : /// Does the given filename look like an ephemeral file?
     244            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     245            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     246            0 :         rest.parse::<u32>().is_ok()
     247              :     } else {
     248            0 :         false
     249              :     }
     250            0 : }
     251              : 
     252              : #[cfg(test)]
     253              : mod tests {
     254              :     use rand::Rng;
     255              : 
     256              :     use super::*;
     257              :     use crate::context::DownloadBehavior;
     258              :     use crate::task_mgr::TaskKind;
     259              :     use std::fs;
     260              :     use std::str::FromStr;
     261              : 
     262            8 :     fn harness(
     263            8 :         test_name: &str,
     264            8 :     ) -> Result<
     265            8 :         (
     266            8 :             &'static PageServerConf,
     267            8 :             TenantShardId,
     268            8 :             TimelineId,
     269            8 :             RequestContext,
     270            8 :         ),
     271            8 :         io::Error,
     272            8 :     > {
     273            8 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     274            8 :         let _ = fs::remove_dir_all(&repo_dir);
     275            8 :         let conf = PageServerConf::dummy_conf(repo_dir);
     276            8 :         // Make a static copy of the config. This can never be free'd, but that's
     277            8 :         // OK in a test.
     278            8 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     279            8 : 
     280            8 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     281            8 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     282            8 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     283              : 
     284            8 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     285            8 : 
     286            8 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     287            8 :     }
     288              : 
     289              :     #[tokio::test]
     290            2 :     async fn ephemeral_file_holds_gate_open() {
     291            2 :         const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
     292            2 : 
     293            2 :         let (conf, tenant_id, timeline_id, ctx) =
     294            2 :             harness("ephemeral_file_holds_gate_open").unwrap();
     295            2 : 
     296            2 :         let gate = utils::sync::gate::Gate::default();
     297            2 : 
     298            2 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     299            2 :             .await
     300            2 :             .unwrap();
     301            2 : 
     302            2 :         let mut closing = tokio::task::spawn(async move {
     303            4 :             gate.close().await;
     304            2 :         });
     305            2 : 
     306            2 :         // gate is entered until the ephemeral file is dropped
     307            2 :         // do not start paused tokio-epoll-uring has a sleep loop
     308            2 :         tokio::time::pause();
     309            2 :         tokio::time::timeout(FOREVER, &mut closing)
     310            2 :             .await
     311            2 :             .expect_err("closing cannot complete before dropping");
     312            2 : 
     313            2 :         // this is a requirement of the reset_tenant functionality: we have to be able to restart a
     314            2 :         // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
     315            2 :         drop(file);
     316            2 : 
     317            2 :         tokio::time::timeout(FOREVER, &mut closing)
     318            2 :             .await
     319            2 :             .expect("closing completes right away")
     320            2 :             .expect("closing does not panic");
     321            2 :     }
     322              : 
     323              :     #[tokio::test]
     324            2 :     async fn test_ephemeral_file_basics() {
     325            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
     326            2 : 
     327            2 :         let gate = utils::sync::gate::Gate::default();
     328            2 : 
     329            2 :         let mut file =
     330            2 :             EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     331            2 :                 .await
     332            2 :                 .unwrap();
     333            2 : 
     334            2 :         let cap = file.buffered_writer.inspect_buffer().capacity();
     335            2 : 
     336            2 :         let write_nbytes = cap + cap / 2;
     337            2 : 
     338            2 :         let content: Vec<u8> = rand::thread_rng()
     339            2 :             .sample_iter(rand::distributions::Standard)
     340            2 :             .take(write_nbytes)
     341            2 :             .collect();
     342            2 : 
     343            2 :         let mut value_offsets = Vec::new();
     344       196608 :         for i in 0..write_nbytes {
     345       196608 :             let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
     346       196608 :             value_offsets.push(off);
     347            2 :         }
     348            2 : 
     349            2 :         assert!(file.len() as usize == write_nbytes);
     350       196608 :         for i in 0..write_nbytes {
     351       196608 :             assert_eq!(value_offsets[i], i.into_u64());
     352       196608 :             let buf = IoBufferMut::with_capacity(1);
     353       196608 :             let (buf_slice, nread) = file
     354       196608 :                 .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
     355        66560 :                 .await
     356       196608 :                 .unwrap();
     357       196608 :             let buf = buf_slice.into_inner();
     358       196608 :             assert_eq!(nread, 1);
     359       196608 :             assert_eq!(&buf, &content[i..i + 1]);
     360            2 :         }
     361            2 : 
     362            2 :         let file_contents =
     363            2 :             std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
     364            2 :         assert_eq!(file_contents, &content[0..cap]);
     365            2 : 
     366            2 :         let buffer_contents = file.buffered_writer.inspect_buffer();
     367            2 :         assert_eq!(buffer_contents, &content[cap..write_nbytes]);
     368            2 :     }
     369              : 
     370              :     #[tokio::test]
     371            2 :     async fn test_flushes_do_happen() {
     372            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
     373            2 : 
     374            2 :         let gate = utils::sync::gate::Gate::default();
     375            2 : 
     376            2 :         let mut file =
     377            2 :             EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     378            2 :                 .await
     379            2 :                 .unwrap();
     380            2 : 
     381            2 :         let cap = file.buffered_writer.inspect_buffer().capacity();
     382            2 : 
     383            2 :         let content: Vec<u8> = rand::thread_rng()
     384            2 :             .sample_iter(rand::distributions::Standard)
     385            2 :             .take(cap + cap / 2)
     386            2 :             .collect();
     387            2 : 
     388            2 :         file.write_raw(&content, &ctx).await.unwrap();
     389            2 : 
     390            2 :         // assert the state is as this test expects it to be
     391            2 :         assert_eq!(
     392            2 :             &file.load_to_io_buf(&ctx).await.unwrap(),
     393            2 :             &content[0..cap + cap / 2]
     394            2 :         );
     395            2 :         let md = file
     396            2 :             .buffered_writer
     397            2 :             .as_inner()
     398            2 :             .as_inner()
     399            2 :             .path()
     400            2 :             .metadata()
     401            2 :             .unwrap();
     402            2 :         assert_eq!(
     403            2 :             md.len(),
     404            2 :             cap.into_u64(),
     405            2 :             "buffered writer does one write if we write 1.5x buffer capacity"
     406            2 :         );
     407            2 :         assert_eq!(
     408            2 :             &file.buffered_writer.inspect_buffer()[0..cap / 2],
     409            2 :             &content[cap..cap + cap / 2]
     410            2 :         );
     411            2 :     }
     412              : 
     413              :     #[tokio::test]
     414            2 :     async fn test_read_split_across_file_and_buffer() {
     415            2 :         // This test exercises the logic on the read path that splits the logical read
     416            2 :         // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
     417            2 :         //
     418            2 :         // This test build on the assertions in test_flushes_do_happen
     419            2 : 
     420            2 :         let (conf, tenant_id, timeline_id, ctx) =
     421            2 :             harness("test_read_split_across_file_and_buffer").unwrap();
     422            2 : 
     423            2 :         let gate = utils::sync::gate::Gate::default();
     424            2 : 
     425            2 :         let mut file =
     426            2 :             EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
     427            2 :                 .await
     428            2 :                 .unwrap();
     429            2 : 
     430            2 :         let cap = file.buffered_writer.inspect_buffer().capacity();
     431            2 : 
     432            2 :         let content: Vec<u8> = rand::thread_rng()
     433            2 :             .sample_iter(rand::distributions::Standard)
     434            2 :             .take(cap + cap / 2)
     435            2 :             .collect();
     436            2 : 
     437            2 :         file.write_raw(&content, &ctx).await.unwrap();
     438            2 : 
     439           10 :         let test_read = |start: usize, len: usize| {
     440           10 :             let file = &file;
     441           10 :             let ctx = &ctx;
     442           10 :             let content = &content;
     443           10 :             async move {
     444           10 :                 let (buf, nread) = file
     445           10 :                     .read_exact_at_eof_ok(
     446           10 :                         start.into_u64(),
     447           10 :                         IoBufferMut::with_capacity(len).slice_full(),
     448           10 :                         ctx,
     449           10 :                     )
     450            3 :                     .await
     451           10 :                     .unwrap();
     452           10 :                 assert_eq!(nread, len);
     453           10 :                 assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
     454           10 :             }
     455           10 :         };
     456            2 : 
     457            2 :         // completely within the file range
     458            2 :         assert!(20 < cap, "test assumption");
     459            2 :         test_read(10, 10).await;
     460            2 :         // border onto edge of file
     461            2 :         test_read(cap - 10, 10).await;
     462            2 :         // read across file and buffer
     463            2 :         test_read(cap - 10, 20).await;
     464            2 :         // stay from start of buffer
     465            2 :         test_read(cap, 10).await;
     466            2 :         // completely within buffer
     467            2 :         test_read(cap + 10, 10).await;
     468            2 :     }
     469              : }
        

Generated by: LCOV version 2.1-beta