LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 96.8 % 435 421
Test Date: 2025-01-07 20:58:07 Functions: 93.9 % 33 31

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
       5              : use crate::config::PageServerConf;
       6              : use crate::context::RequestContext;
       7              : use crate::page_cache;
       8              : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
       9              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
      10              : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
      11              : use crate::virtual_file::owned_buffers_io::write::Buffer;
      12              : use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
      13              : use camino::Utf8PathBuf;
      14              : use num_traits::Num;
      15              : use pageserver_api::shard::TenantShardId;
      16              : use tokio_epoll_uring::{BoundedBuf, Slice};
      17              : use tracing::error;
      18              : 
      19              : use std::io;
      20              : use std::sync::atomic::AtomicU64;
      21              : use std::sync::Arc;
      22              : use utils::id::TimelineId;
      23              : 
      24              : pub struct EphemeralFile {
      25              :     _tenant_shard_id: TenantShardId,
      26              :     _timeline_id: TimelineId,
      27              :     page_cache_file_id: page_cache::FileId,
      28              :     bytes_written: u64,
      29              :     buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
      30              :     /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
      31              :     _gate_guard: utils::sync::gate::GateGuard,
      32              : }
      33              : 
      34              : const TAIL_SZ: usize = 64 * 1024;
      35              : 
      36              : impl EphemeralFile {
      37         1280 :     pub async fn create(
      38         1280 :         conf: &PageServerConf,
      39         1280 :         tenant_shard_id: TenantShardId,
      40         1280 :         timeline_id: TimelineId,
      41         1280 :         gate: &utils::sync::gate::Gate,
      42         1280 :         ctx: &RequestContext,
      43         1280 :     ) -> anyhow::Result<EphemeralFile> {
      44              :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      45         1280 :         let filename_disambiguator =
      46         1280 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      47         1280 : 
      48         1280 :         let filename = conf
      49         1280 :             .timeline_path(&tenant_shard_id, &timeline_id)
      50         1280 :             .join(Utf8PathBuf::from(format!(
      51         1280 :                 "ephemeral-{filename_disambiguator}"
      52         1280 :             )));
      53              : 
      54         1280 :         let file = Arc::new(
      55         1280 :             VirtualFile::open_with_options_v2(
      56         1280 :                 &filename,
      57         1280 :                 virtual_file::OpenOptions::new()
      58         1280 :                     .read(true)
      59         1280 :                     .write(true)
      60         1280 :                     .create(true),
      61         1280 :                 ctx,
      62         1280 :             )
      63         1280 :             .await?,
      64              :         );
      65              : 
      66         1280 :         let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
      67         1280 : 
      68         1280 :         Ok(EphemeralFile {
      69         1280 :             _tenant_shard_id: tenant_shard_id,
      70         1280 :             _timeline_id: timeline_id,
      71         1280 :             page_cache_file_id,
      72         1280 :             bytes_written: 0,
      73         1280 :             buffered_writer: owned_buffers_io::write::BufferedWriter::new(
      74         1280 :                 file,
      75         2560 :                 || IoBufferMut::with_capacity(TAIL_SZ),
      76         1280 :                 gate.enter()?,
      77         1280 :                 ctx,
      78         1280 :             ),
      79         1280 :             _gate_guard: gate.enter()?,
      80              :         })
      81         1280 :     }
      82              : }
      83              : 
      84              : impl Drop for EphemeralFile {
      85         1158 :     fn drop(&mut self) {
      86         1158 :         // unlink the file
      87         1158 :         // we are clear to do this, because we have entered a gate
      88         1158 :         let path = self.buffered_writer.as_inner().path();
      89         1158 :         let res = std::fs::remove_file(path);
      90         1158 :         if let Err(e) = res {
      91            2 :             if e.kind() != std::io::ErrorKind::NotFound {
      92              :                 // just never log the not found errors, we cannot do anything for them; on detach
      93              :                 // the tenant directory is already gone.
      94              :                 //
      95              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
      96            0 :                 error!("could not remove ephemeral file '{path}': {e}");
      97            2 :             }
      98         1156 :         }
      99         1158 :     }
     100              : }
     101              : 
     102              : impl EphemeralFile {
     103      9610656 :     pub(crate) fn len(&self) -> u64 {
     104      9610656 :         self.bytes_written
     105      9610656 :     }
     106              : 
     107         1272 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
     108         1272 :         self.page_cache_file_id
     109         1272 :     }
     110              : 
     111          970 :     pub(crate) async fn load_to_io_buf(
     112          970 :         &self,
     113          970 :         ctx: &RequestContext,
     114          970 :     ) -> Result<IoBufferMut, io::Error> {
     115          970 :         let size = self.len().into_usize();
     116          970 :         let buf = IoBufferMut::with_capacity(size);
     117          970 :         let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
     118          970 :         assert_eq!(nread, size);
     119          970 :         let buf = slice.into_inner();
     120          970 :         assert_eq!(buf.len(), nread);
     121          970 :         assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
     122          970 :         Ok(buf)
     123          970 :     }
     124              : 
     125              :     /// Returns the offset at which the first byte of the input was written, for use
     126              :     /// in constructing indices over the written value.
     127              :     ///
     128              :     /// Panics if the write is short because there's no way we can recover from that.
     129              :     /// TODO: make upstack handle this as an error.
     130      4804848 :     pub(crate) async fn write_raw(
     131      4804848 :         &mut self,
     132      4804848 :         srcbuf: &[u8],
     133      4804848 :         ctx: &RequestContext,
     134      4804848 :     ) -> std::io::Result<u64> {
     135      4804848 :         let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
     136      4804848 :         if let Some(control) = control {
     137         5534 :             control.release().await;
     138      4799314 :         }
     139      4804848 :         Ok(pos)
     140      4804848 :     }
     141              : 
     142      4804850 :     async fn write_raw_controlled(
     143      4804850 :         &mut self,
     144      4804850 :         srcbuf: &[u8],
     145      4804850 :         ctx: &RequestContext,
     146      4804850 :     ) -> std::io::Result<(u64, Option<owned_buffers_io::write::FlushControl>)> {
     147      4804850 :         let pos = self.bytes_written;
     148              : 
     149      4804850 :         let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
     150            0 :             std::io::Error::new(
     151            0 :                 std::io::ErrorKind::Other,
     152            0 :                 format!(
     153            0 :                     "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
     154            0 :                     srcbuf_len = srcbuf.len(),
     155            0 :                 ),
     156            0 :             )
     157      4804850 :         })?;
     158              : 
     159              :         // Write the payload
     160      4804850 :         let (nwritten, control) = self
     161      4804850 :             .buffered_writer
     162      4804850 :             .write_buffered_borrowed_controlled(srcbuf, ctx)
     163      4804850 :             .await?;
     164      4804850 :         assert_eq!(
     165      4804850 :             nwritten,
     166      4804850 :             srcbuf.len(),
     167            0 :             "buffered writer has no short writes"
     168              :         );
     169              : 
     170      4804850 :         self.bytes_written = new_bytes_written;
     171      4804850 : 
     172      4804850 :         Ok((pos, control))
     173      4804850 :     }
     174              : }
     175              : 
     176              : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
     177       498416 :     async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
     178       498416 :         &self,
     179       498416 :         start: u64,
     180       498416 :         dst: tokio_epoll_uring::Slice<B>,
     181       498416 :         ctx: &RequestContext,
     182       498416 :     ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
     183       498416 :         let submitted_offset = self.buffered_writer.bytes_submitted();
     184       498416 : 
     185       498416 :         let mutable = self.buffered_writer.inspect_mutable();
     186       498416 :         let mutable = &mutable[0..mutable.pending()];
     187       498416 : 
     188       498416 :         let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
     189       498416 : 
     190       498416 :         let dst_cap = dst.bytes_total().into_u64();
     191       498416 :         let end = {
     192              :             // saturating_add is correct here because the max file size is u64::MAX, so,
     193              :             // if start + dst.len() > u64::MAX, then we know it will be a short read
     194       498416 :             let mut end: u64 = start.saturating_add(dst_cap);
     195       498416 :             if end > self.bytes_written {
     196       276856 :                 end = self.bytes_written;
     197       276856 :             }
     198       498416 :             end
     199              :         };
     200              : 
     201              :         // inclusive, exclusive
     202              :         #[derive(Debug)]
     203              :         struct Range<N>(N, N);
     204              :         impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
     205      3304174 :             fn len(&self) -> N {
     206      3304174 :                 if self.0 > self.1 {
     207      1774301 :                     N::zero()
     208              :                 } else {
     209      1529873 :                     self.1 - self.0
     210              :                 }
     211      3304174 :             }
     212              :         }
     213              : 
     214       498416 :         let (written_range, maybe_flushed_range) = {
     215       498416 :             if maybe_flushed.is_some() {
     216              :                 // [       written       ][ maybe_flushed ][    mutable    ]
     217              :                 //                        <-   TAIL_SZ   -><-   TAIL_SZ   ->
     218              :                 //                                         ^
     219              :                 //                                 `submitted_offset`
     220              :                 // <++++++ on disk +++++++????????????????>
     221       488263 :                 (
     222       488263 :                     Range(
     223       488263 :                         start,
     224       488263 :                         std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     225       488263 :                     ),
     226       488263 :                     Range(
     227       488263 :                         std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     228       488263 :                         std::cmp::min(end, submitted_offset),
     229       488263 :                     ),
     230       488263 :                 )
     231              :             } else {
     232              :                 // [       written                        ][    mutable    ]
     233              :                 //                                         <-   TAIL_SZ   ->
     234              :                 //                                         ^
     235              :                 //                                 `submitted_offset`
     236              :                 // <++++++ on disk +++++++++++++++++++++++>
     237        10153 :                 (
     238        10153 :                     Range(start, std::cmp::min(end, submitted_offset)),
     239        10153 :                     // zero len
     240        10153 :                     Range(submitted_offset, u64::MIN),
     241        10153 :                 )
     242              :             }
     243              :         };
     244              : 
     245       498416 :         let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
     246              : 
     247       498416 :         let dst = if written_range.len() > 0 {
     248        10183 :             let file: &VirtualFile = self.buffered_writer.as_inner();
     249        10183 :             let bounds = dst.bounds();
     250        10183 :             let slice = file
     251        10183 :                 .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
     252        10183 :                 .await?;
     253        10183 :             Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
     254              :         } else {
     255       488233 :             dst
     256              :         };
     257              : 
     258       498416 :         let dst = if maybe_flushed_range.len() > 0 {
     259       160413 :             let offset_in_buffer = maybe_flushed_range
     260       160413 :                 .0
     261       160413 :                 .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
     262       160413 :                 .unwrap()
     263       160413 :                 .into_usize();
     264       160413 :             // Checked previously the buffer is Some.
     265       160413 :             let maybe_flushed = maybe_flushed.unwrap();
     266       160413 :             let to_copy = &maybe_flushed
     267       160413 :                 [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
     268       160413 :             let bounds = dst.bounds();
     269       160413 :             let mut view = dst.slice({
     270       160413 :                 let start = written_range.len().into_usize();
     271       160413 :                 let end = start
     272       160413 :                     .checked_add(maybe_flushed_range.len().into_usize())
     273       160413 :                     .unwrap();
     274       160413 :                 start..end
     275       160413 :             });
     276       160413 :             view.as_mut_rust_slice_full_zeroed()
     277       160413 :                 .copy_from_slice(to_copy);
     278       160413 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     279              :         } else {
     280       338003 :             dst
     281              :         };
     282              : 
     283       498416 :         let dst = if mutable_range.len() > 0 {
     284       329376 :             let offset_in_buffer = mutable_range
     285       329376 :                 .0
     286       329376 :                 .checked_sub(submitted_offset)
     287       329376 :                 .unwrap()
     288       329376 :                 .into_usize();
     289       329376 :             let to_copy =
     290       329376 :                 &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
     291       329376 :             let bounds = dst.bounds();
     292       329376 :             let mut view = dst.slice({
     293       329376 :                 let start =
     294       329376 :                     written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
     295       329376 :                 let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
     296       329376 :                 start..end
     297       329376 :             });
     298       329376 :             view.as_mut_rust_slice_full_zeroed()
     299       329376 :                 .copy_from_slice(to_copy);
     300       329376 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     301              :         } else {
     302       169040 :             dst
     303              :         };
     304              : 
     305              :         // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
     306              : 
     307       498416 :         Ok((dst, (end - start).into_usize()))
     308       498416 :     }
     309              : }
     310              : 
     311              : /// Does the given filename look like an ephemeral file?
     312            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     313            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     314            0 :         rest.parse::<u32>().is_ok()
     315              :     } else {
     316            0 :         false
     317              :     }
     318            0 : }
     319              : 
     320              : #[cfg(test)]
     321              : mod tests {
     322              :     use rand::Rng;
     323              : 
     324              :     use super::*;
     325              :     use crate::context::DownloadBehavior;
     326              :     use crate::task_mgr::TaskKind;
     327              :     use std::fs;
     328              :     use std::str::FromStr;
     329              : 
     330            8 :     fn harness(
     331            8 :         test_name: &str,
     332            8 :     ) -> Result<
     333            8 :         (
     334            8 :             &'static PageServerConf,
     335            8 :             TenantShardId,
     336            8 :             TimelineId,
     337            8 :             RequestContext,
     338            8 :         ),
     339            8 :         io::Error,
     340            8 :     > {
     341            8 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     342            8 :         let _ = fs::remove_dir_all(&repo_dir);
     343            8 :         let conf = PageServerConf::dummy_conf(repo_dir);
     344            8 :         // Make a static copy of the config. This can never be free'd, but that's
     345            8 :         // OK in a test.
     346            8 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     347            8 : 
     348            8 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     349            8 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     350            8 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     351              : 
     352            8 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     353            8 : 
     354            8 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     355            8 :     }
     356              : 
     357              :     #[tokio::test]
     358            2 :     async fn ephemeral_file_holds_gate_open() {
     359            2 :         const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
     360            2 : 
     361            2 :         let (conf, tenant_id, timeline_id, ctx) =
     362            2 :             harness("ephemeral_file_holds_gate_open").unwrap();
     363            2 : 
     364            2 :         let gate = utils::sync::gate::Gate::default();
     365            2 : 
     366            2 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
     367            2 :             .await
     368            2 :             .unwrap();
     369            2 : 
     370            2 :         let mut closing = tokio::task::spawn(async move {
     371            2 :             gate.close().await;
     372            2 :         });
     373            2 : 
     374            2 :         // gate is entered until the ephemeral file is dropped
     375            2 :         // do not start paused tokio-epoll-uring has a sleep loop
     376            2 :         tokio::time::pause();
     377            2 :         tokio::time::timeout(FOREVER, &mut closing)
     378            2 :             .await
     379            2 :             .expect_err("closing cannot complete before dropping");
     380            2 : 
     381            2 :         // this is a requirement of the reset_tenant functionality: we have to be able to restart a
     382            2 :         // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
     383            2 :         drop(file);
     384            2 : 
     385            2 :         tokio::time::timeout(FOREVER, &mut closing)
     386            2 :             .await
     387            2 :             .expect("closing completes right away")
     388            2 :             .expect("closing does not panic");
     389            2 :     }
     390              : 
     391              :     #[tokio::test]
     392            2 :     async fn test_ephemeral_file_basics() {
     393            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
     394            2 : 
     395            2 :         let gate = utils::sync::gate::Gate::default();
     396            2 : 
     397            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
     398            2 :             .await
     399            2 :             .unwrap();
     400            2 : 
     401            2 :         let mutable = file.buffered_writer.inspect_mutable();
     402            2 :         let cap = mutable.capacity();
     403            2 :         let align = mutable.align();
     404            2 : 
     405            2 :         let write_nbytes = cap * 2 + cap / 2;
     406            2 : 
     407            2 :         let content: Vec<u8> = rand::thread_rng()
     408            2 :             .sample_iter(rand::distributions::Standard)
     409            2 :             .take(write_nbytes)
     410            2 :             .collect();
     411            2 : 
     412            2 :         let mut value_offsets = Vec::new();
     413          640 :         for range in (0..write_nbytes)
     414            2 :             .step_by(align)
     415          640 :             .map(|start| start..(start + align).min(write_nbytes))
     416            2 :         {
     417          640 :             let off = file.write_raw(&content[range], &ctx).await.unwrap();
     418          640 :             value_offsets.push(off);
     419            2 :         }
     420            2 : 
     421            2 :         assert_eq!(file.len() as usize, write_nbytes);
     422          640 :         for (i, range) in (0..write_nbytes)
     423            2 :             .step_by(align)
     424          640 :             .map(|start| start..(start + align).min(write_nbytes))
     425            2 :             .enumerate()
     426            2 :         {
     427          640 :             assert_eq!(value_offsets[i], range.start.into_u64());
     428          640 :             let buf = IoBufferMut::with_capacity(range.len());
     429          640 :             let (buf_slice, nread) = file
     430          640 :                 .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
     431          640 :                 .await
     432          640 :                 .unwrap();
     433          640 :             let buf = buf_slice.into_inner();
     434          640 :             assert_eq!(nread, range.len());
     435          640 :             assert_eq!(&buf, &content[range]);
     436            2 :         }
     437            2 : 
     438            2 :         let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
     439            2 :         assert!(file_contents == content[0..cap * 2]);
     440            2 : 
     441            2 :         let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
     442            2 :         assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
     443            2 : 
     444            2 :         let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
     445            2 :         assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
     446            2 :     }
     447              : 
     448              :     #[tokio::test]
     449            2 :     async fn test_flushes_do_happen() {
     450            2 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
     451            2 : 
     452            2 :         let gate = utils::sync::gate::Gate::default();
     453            2 : 
     454            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
     455            2 :             .await
     456            2 :             .unwrap();
     457            2 : 
     458            2 :         // mutable buffer and maybe_flushed buffer each has `cap` bytes.
     459            2 :         let cap = file.buffered_writer.inspect_mutable().capacity();
     460            2 : 
     461            2 :         let content: Vec<u8> = rand::thread_rng()
     462            2 :             .sample_iter(rand::distributions::Standard)
     463            2 :             .take(cap * 2 + cap / 2)
     464            2 :             .collect();
     465            2 : 
     466            2 :         file.write_raw(&content, &ctx).await.unwrap();
     467            2 : 
     468            2 :         // assert the state is as this test expects it to be
     469            2 :         assert_eq!(
     470            2 :             &file.load_to_io_buf(&ctx).await.unwrap(),
     471            2 :             &content[0..cap * 2 + cap / 2]
     472            2 :         );
     473            2 :         let md = file.buffered_writer.as_inner().path().metadata().unwrap();
     474            2 :         assert_eq!(
     475            2 :             md.len(),
     476            2 :             2 * cap.into_u64(),
     477            2 :             "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
     478            2 :         );
     479            2 :         assert_eq!(
     480            2 :             &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
     481            2 :             &content[cap..cap * 2]
     482            2 :         );
     483            2 :         assert_eq!(
     484            2 :             &file.buffered_writer.inspect_mutable()[0..cap / 2],
     485            2 :             &content[cap * 2..cap * 2 + cap / 2]
     486            2 :         );
     487            2 :     }
     488              : 
     489              :     #[tokio::test]
     490            2 :     async fn test_read_split_across_file_and_buffer() {
     491            2 :         // This test exercises the logic on the read path that splits the logical read
     492            2 :         // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
     493            2 :         //
     494            2 :         // This test build on the assertions in test_flushes_do_happen
     495            2 : 
     496            2 :         let (conf, tenant_id, timeline_id, ctx) =
     497            2 :             harness("test_read_split_across_file_and_buffer").unwrap();
     498            2 : 
     499            2 :         let gate = utils::sync::gate::Gate::default();
     500            2 : 
     501            2 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
     502            2 :             .await
     503            2 :             .unwrap();
     504            2 : 
     505            2 :         let mutable = file.buffered_writer.inspect_mutable();
     506            2 :         let cap = mutable.capacity();
     507            2 :         let align = mutable.align();
     508            2 :         let content: Vec<u8> = rand::thread_rng()
     509            2 :             .sample_iter(rand::distributions::Standard)
     510            2 :             .take(cap * 2 + cap / 2)
     511            2 :             .collect();
     512            2 : 
     513            2 :         let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
     514            2 : 
     515           54 :         let test_read = |start: usize, len: usize| {
     516           54 :             let file = &file;
     517           54 :             let ctx = &ctx;
     518           54 :             let content = &content;
     519           54 :             async move {
     520           54 :                 let (buf, nread) = file
     521           54 :                     .read_exact_at_eof_ok(
     522           54 :                         start.into_u64(),
     523           54 :                         IoBufferMut::with_capacity(len).slice_full(),
     524           54 :                         ctx,
     525           54 :                     )
     526           54 :                     .await
     527           54 :                     .unwrap();
     528           54 :                 assert_eq!(nread, len);
     529           54 :                 assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
     530           54 :             }
     531           54 :         };
     532            2 : 
     533            6 :         let test_read_all_offset_combinations = || {
     534            6 :             async move {
     535            6 :                 test_read(align, align).await;
     536            2 :                 // border onto edge of file
     537            6 :                 test_read(cap - align, align).await;
     538            2 :                 // read across file and buffer
     539            6 :                 test_read(cap - align, 2 * align).await;
     540            2 :                 // stay from start of maybe flushed buffer
     541            6 :                 test_read(cap, align).await;
     542            2 :                 // completely within maybe flushed buffer
     543            6 :                 test_read(cap + align, align).await;
     544            2 :                 // border onto edge of maybe flushed buffer.
     545            6 :                 test_read(cap * 2 - align, align).await;
     546            2 :                 // read across maybe flushed and mutable buffer
     547            6 :                 test_read(cap * 2 - align, 2 * align).await;
     548            2 :                 // read across three segments
     549            6 :                 test_read(cap - align, cap + 2 * align).await;
     550            2 :                 // completely within mutable buffer
     551            6 :                 test_read(cap * 2 + align, align).await;
     552            6 :             }
     553            6 :         };
     554            2 : 
     555            2 :         // completely within the file range
     556            2 :         assert!(align < cap, "test assumption");
     557            2 :         assert!(cap % align == 0);
     558            2 : 
     559            2 :         // test reads at different flush stages.
     560            2 :         let not_started = control.unwrap().into_not_started();
     561            2 :         test_read_all_offset_combinations().await;
     562            2 :         let in_progress = not_started.ready_to_flush();
     563            2 :         test_read_all_offset_combinations().await;
     564            2 :         in_progress.wait_until_flush_is_done().await;
     565            2 :         test_read_all_offset_combinations().await;
     566            2 :     }
     567              : }
        

Generated by: LCOV version 2.1-beta