LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 95.8 % 456 437
Test Date: 2025-04-24 20:31:15 Functions: 89.2 % 37 33

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use std::io;
       5              : use std::sync::Arc;
       6              : use std::sync::atomic::AtomicU64;
       7              : 
       8              : use camino::Utf8PathBuf;
       9              : use num_traits::Num;
      10              : use pageserver_api::shard::TenantShardId;
      11              : use tokio_epoll_uring::{BoundedBuf, Slice};
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{error, info_span};
      14              : use utils::id::TimelineId;
      15              : use utils::sync::gate::GateGuard;
      16              : 
      17              : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
      18              : use crate::config::PageServerConf;
      19              : use crate::context::RequestContext;
      20              : use crate::page_cache;
      21              : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
      22              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
      23              : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
      24              : use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
      25              : use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
      26              : 
      27              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      28              : 
      29              : pub struct EphemeralFile {
      30              :     _tenant_shard_id: TenantShardId,
      31              :     _timeline_id: TimelineId,
      32              :     page_cache_file_id: page_cache::FileId,
      33              :     bytes_written: u64,
      34              :     file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
      35              :     buffered_writer: BufferedWriter,
      36              : }
      37              : 
      38              : type BufferedWriter = owned_buffers_io::write::BufferedWriter<
      39              :     IoBufferMut,
      40              :     TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
      41              : >;
      42              : 
      43              : /// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
      44              : ///
      45              : /// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
      46              : /// The co-ownership is between [`EphemeralFile`] and that flush task.)
      47              : ///
      48              : /// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
      49              : #[derive(Debug, Clone)]
      50              : struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
      51              :     inner: Arc<TempVirtualFile>,
      52              : }
      53              : 
      54              : const TAIL_SZ: usize = 64 * 1024;
      55              : 
      56              : impl EphemeralFile {
      57         7956 :     pub async fn create(
      58         7956 :         conf: &PageServerConf,
      59         7956 :         tenant_shard_id: TenantShardId,
      60         7956 :         timeline_id: TimelineId,
      61         7956 :         gate: &utils::sync::gate::Gate,
      62         7956 :         cancel: &CancellationToken,
      63         7956 :         ctx: &RequestContext,
      64         7956 :     ) -> anyhow::Result<EphemeralFile> {
      65              :         // TempVirtualFile requires us to never reuse a filename while an old
      66              :         // instance of TempVirtualFile created with that filename is not done dropping yet.
      67              :         // So, we use a monotonic counter to disambiguate the filenames.
      68              :         static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
      69         7956 :         let filename_disambiguator =
      70         7956 :             NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      71         7956 : 
      72         7956 :         let filename = conf
      73         7956 :             .timeline_path(&tenant_shard_id, &timeline_id)
      74         7956 :             .join(Utf8PathBuf::from(format!(
      75         7956 :                 "ephemeral-{filename_disambiguator}"
      76         7956 :             )));
      77              : 
      78         7956 :         let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
      79         7956 :             VirtualFile::open_with_options_v2(
      80         7956 :                 &filename,
      81         7956 :                 virtual_file::OpenOptions::new()
      82         7956 :                     .create_new(true)
      83         7956 :                     .read(true)
      84         7956 :                     .write(true),
      85         7956 :                 ctx,
      86         7956 :             )
      87         7956 :             .await?,
      88         7956 :             gate.enter()?,
      89              :         );
      90              : 
      91         7956 :         let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
      92         7956 : 
      93         7956 :         Ok(EphemeralFile {
      94         7956 :             _tenant_shard_id: tenant_shard_id,
      95         7956 :             _timeline_id: timeline_id,
      96         7956 :             page_cache_file_id,
      97         7956 :             bytes_written: 0,
      98         7956 :             file: file.clone(),
      99         7956 :             buffered_writer: BufferedWriter::new(
     100         7956 :                 file,
     101              :                 0,
     102        15912 :                 || IoBufferMut::with_capacity(TAIL_SZ),
     103         7956 :                 gate.enter()?,
     104         7956 :                 cancel.child_token(),
     105         7956 :                 ctx,
     106         7956 :                 info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
     107              :             ),
     108              :         })
     109         7956 :     }
     110              : }
     111              : 
     112              : impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
     113         7956 :     fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
     114         7956 :         Self {
     115         7956 :             inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
     116         7956 :         }
     117         7956 :     }
     118              : }
     119              : 
     120              : impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
     121        39840 :     fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
     122        39840 :         &self,
     123        39840 :         buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
     124        39840 :         offset: u64,
     125        39840 :         ctx: &RequestContext,
     126        39840 :     ) -> impl std::future::Future<
     127        39840 :         Output = (
     128        39840 :             owned_buffers_io::io_buf_ext::FullSlice<Buf>,
     129        39840 :             std::io::Result<()>,
     130        39840 :         ),
     131        39840 :     > + Send {
     132        39840 :         self.inner.write_all_at(buf, offset, ctx)
     133        39840 :     }
     134              : 
     135            0 :     fn set_len(
     136            0 :         &self,
     137            0 :         len: u64,
     138            0 :         ctx: &RequestContext,
     139            0 :     ) -> impl Future<Output = std::io::Result<()>> + Send {
     140            0 :         self.inner.set_len(len, ctx)
     141            0 :     }
     142              : }
     143              : 
     144              : impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
     145              :     type Target = VirtualFile;
     146              : 
     147       223847 :     fn deref(&self) -> &Self::Target {
     148       223847 :         &self.inner
     149       223847 :     }
     150              : }
     151              : 
     152              : #[derive(Debug, thiserror::Error)]
     153              : pub(crate) enum EphemeralFileWriteError {
     154              :     #[error("{0}")]
     155              :     TooLong(String),
     156              :     #[error("cancelled")]
     157              :     Cancelled,
     158              : }
     159              : 
     160              : impl EphemeralFile {
     161     57678888 :     pub(crate) fn len(&self) -> u64 {
     162     57678888 :         self.bytes_written
     163     57678888 :     }
     164              : 
     165         7908 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
     166         7908 :         self.page_cache_file_id
     167         7908 :     }
     168              : 
     169         5820 :     pub(crate) async fn load_to_io_buf(
     170         5820 :         &self,
     171         5820 :         ctx: &RequestContext,
     172         5820 :     ) -> Result<IoBufferMut, io::Error> {
     173         5820 :         let size = self.len().into_usize();
     174         5820 :         let buf = IoBufferMut::with_capacity(size);
     175         5820 :         let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
     176         5820 :         assert_eq!(nread, size);
     177         5820 :         let buf = slice.into_inner();
     178         5820 :         assert_eq!(buf.len(), nread);
     179         5820 :         assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
     180         5820 :         Ok(buf)
     181         5820 :     }
     182              : 
     183              :     /// Returns the offset at which the first byte of the input was written, for use
     184              :     /// in constructing indices over the written value.
     185              :     ///
     186              :     /// Panics if the write is short because there's no way we can recover from that.
     187              :     /// TODO: make upstack handle this as an error.
     188     28829352 :     pub(crate) async fn write_raw(
     189     28829352 :         &mut self,
     190     28829352 :         srcbuf: &[u8],
     191     28829352 :         ctx: &RequestContext,
     192     28829352 :     ) -> Result<u64, EphemeralFileWriteError> {
     193     28829352 :         let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
     194     28829352 :         if let Some(control) = control {
     195        33228 :             control.release().await;
     196     28796124 :         }
     197     28829352 :         Ok(pos)
     198     28829352 :     }
     199              : 
     200     28829364 :     async fn write_raw_controlled(
     201     28829364 :         &mut self,
     202     28829364 :         srcbuf: &[u8],
     203     28829364 :         ctx: &RequestContext,
     204     28829364 :     ) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
     205     28829364 :         let pos = self.bytes_written;
     206              : 
     207     28829364 :         let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
     208            0 :             EphemeralFileWriteError::TooLong(format!(
     209            0 :                 "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
     210            0 :                 srcbuf_len = srcbuf.len(),
     211            0 :             ))
     212     28829364 :         })?;
     213              : 
     214              :         // Write the payload
     215     28829364 :         let (nwritten, control) = self
     216     28829364 :             .buffered_writer
     217     28829364 :             .write_buffered_borrowed_controlled(srcbuf, ctx)
     218     28829364 :             .await
     219     28829364 :             .map_err(|e| match e {
     220            0 :                 FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
     221     28829364 :             })?;
     222     28829364 :         assert_eq!(
     223     28829364 :             nwritten,
     224     28829364 :             srcbuf.len(),
     225            0 :             "buffered writer has no short writes"
     226              :         );
     227              : 
     228     28829364 :         self.bytes_written = new_bytes_written;
     229     28829364 : 
     230     28829364 :         Ok((pos, control))
     231     28829364 :     }
     232              : }
     233              : 
     234              : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
     235      3179205 :     async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
     236      3179205 :         &self,
     237      3179205 :         start: u64,
     238      3179205 :         dst: tokio_epoll_uring::Slice<B>,
     239      3179205 :         ctx: &RequestContext,
     240      3179205 :     ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
     241      3179205 :         let submitted_offset = self.buffered_writer.bytes_submitted();
     242              : 
     243      3179205 :         let mutable = match self.buffered_writer.inspect_mutable() {
     244      3179205 :             Some(mutable) => &mutable[0..mutable.pending()],
     245              :             None => {
     246              :                 // Timeline::cancel and hence buffered writer flush was cancelled.
     247              :                 // Remain read-available while timeline is shutting down.
     248            0 :                 &[]
     249              :             }
     250              :         };
     251              : 
     252      3179205 :         let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
     253      3179205 : 
     254      3179205 :         let dst_cap = dst.bytes_total().into_u64();
     255      3179205 :         let end = {
     256              :             // saturating_add is correct here because the max file size is u64::MAX, so,
     257              :             // if start + dst.len() > u64::MAX, then we know it will be a short read
     258      3179205 :             let mut end: u64 = start.saturating_add(dst_cap);
     259      3179205 :             if end > self.bytes_written {
     260      1664257 :                 end = self.bytes_written;
     261      1664257 :             }
     262      3179205 :             end
     263              :         };
     264              : 
     265              :         // inclusive, exclusive
     266              :         #[derive(Debug)]
     267              :         struct Range<N>(N, N);
     268              :         impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
     269     20649276 :             fn len(&self) -> N {
     270     20649276 :                 if self.0 > self.1 {
     271     11048944 :                     N::zero()
     272              :                 } else {
     273      9600332 :                     self.1 - self.0
     274              :                 }
     275     20649276 :             }
     276              :         }
     277              : 
     278      3179205 :         let (written_range, maybe_flushed_range) = {
     279      3179205 :             if maybe_flushed.is_some() {
     280              :                 // [       written       ][ maybe_flushed ][    mutable    ]
     281              :                 //                                         ^
     282              :                 //                                 `submitted_offset`
     283              :                 // <++++++ on disk +++++++????????????????>
     284      3116461 :                 (
     285      3116461 :                     Range(
     286      3116461 :                         start,
     287      3116461 :                         std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     288      3116461 :                     ),
     289      3116461 :                     Range(
     290      3116461 :                         std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     291      3116461 :                         std::cmp::min(end, submitted_offset),
     292      3116461 :                     ),
     293      3116461 :                 )
     294              :             } else {
     295              :                 // [       written                        ][    mutable    ]
     296              :                 //                                         ^
     297              :                 //                                 `submitted_offset`
     298              :                 // <++++++ on disk +++++++++++++++++++++++>
     299        62744 :                 (
     300        62744 :                     Range(start, std::cmp::min(end, submitted_offset)),
     301        62744 :                     // zero len
     302        62744 :                     Range(submitted_offset, u64::MIN),
     303        62744 :                 )
     304              :             }
     305              :         };
     306              : 
     307      3179205 :         let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
     308              : 
     309      3179205 :         let dst = if written_range.len() > 0 {
     310       223823 :             let bounds = dst.bounds();
     311       223823 :             let slice = self
     312       223823 :                 .file
     313       223823 :                 .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
     314       223823 :                 .await?;
     315       223823 :             Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
     316              :         } else {
     317      2955382 :             dst
     318              :         };
     319              : 
     320      3179205 :         let dst = if maybe_flushed_range.len() > 0 {
     321       980578 :             let offset_in_buffer = maybe_flushed_range
     322       980578 :                 .0
     323       980578 :                 .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
     324       980578 :                 .unwrap()
     325       980578 :                 .into_usize();
     326       980578 :             // Checked previously the buffer is Some.
     327       980578 :             let maybe_flushed = maybe_flushed.unwrap();
     328       980578 :             let to_copy = &maybe_flushed
     329       980578 :                 [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
     330       980578 :             let bounds = dst.bounds();
     331       980578 :             let mut view = dst.slice({
     332       980578 :                 let start = written_range.len().into_usize();
     333       980578 :                 let end = start
     334       980578 :                     .checked_add(maybe_flushed_range.len().into_usize())
     335       980578 :                     .unwrap();
     336       980578 :                 start..end
     337       980578 :             });
     338       980578 :             view.as_mut_rust_slice_full_zeroed()
     339       980578 :                 .copy_from_slice(to_copy);
     340       980578 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     341              :         } else {
     342      2198627 :             dst
     343              :         };
     344              : 
     345      3179205 :         let dst = if mutable_range.len() > 0 {
     346      1986526 :             let offset_in_buffer = mutable_range
     347      1986526 :                 .0
     348      1986526 :                 .checked_sub(submitted_offset)
     349      1986526 :                 .unwrap()
     350      1986526 :                 .into_usize();
     351      1986526 :             let to_copy =
     352      1986526 :                 &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
     353      1986526 :             let bounds = dst.bounds();
     354      1986526 :             let mut view = dst.slice({
     355      1986526 :                 let start =
     356      1986526 :                     written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
     357      1986526 :                 let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
     358      1986526 :                 start..end
     359      1986526 :             });
     360      1986526 :             view.as_mut_rust_slice_full_zeroed()
     361      1986526 :                 .copy_from_slice(to_copy);
     362      1986526 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     363              :         } else {
     364      1192679 :             dst
     365              :         };
     366              : 
     367              :         // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
     368              : 
     369      3179205 :         Ok((dst, (end - start).into_usize()))
     370      3179205 :     }
     371              : }
     372              : 
     373              : /// Does the given filename look like an ephemeral file?
     374            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     375            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     376            0 :         rest.parse::<u32>().is_ok()
     377              :     } else {
     378            0 :         false
     379              :     }
     380            0 : }
     381              : 
     382              : #[cfg(test)]
     383              : mod tests {
     384              :     use std::fs;
     385              :     use std::str::FromStr;
     386              : 
     387              :     use rand::Rng;
     388              : 
     389              :     use super::*;
     390              :     use crate::context::DownloadBehavior;
     391              :     use crate::task_mgr::TaskKind;
     392              : 
     393           48 :     fn harness(
     394           48 :         test_name: &str,
     395           48 :     ) -> Result<
     396           48 :         (
     397           48 :             &'static PageServerConf,
     398           48 :             TenantShardId,
     399           48 :             TimelineId,
     400           48 :             RequestContext,
     401           48 :         ),
     402           48 :         io::Error,
     403           48 :     > {
     404           48 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     405           48 :         let _ = fs::remove_dir_all(&repo_dir);
     406           48 :         let conf = PageServerConf::dummy_conf(repo_dir);
     407           48 :         // Make a static copy of the config. This can never be free'd, but that's
     408           48 :         // OK in a test.
     409           48 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     410           48 : 
     411           48 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     412           48 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     413           48 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     414              : 
     415           48 :         let ctx =
     416           48 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     417           48 : 
     418           48 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     419           48 :     }
     420              : 
     421              :     #[tokio::test]
     422           12 :     async fn ephemeral_file_holds_gate_open() {
     423           12 :         const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
     424           12 : 
     425           12 :         let (conf, tenant_id, timeline_id, ctx) =
     426           12 :             harness("ephemeral_file_holds_gate_open").unwrap();
     427           12 : 
     428           12 :         let gate = utils::sync::gate::Gate::default();
     429           12 :         let cancel = CancellationToken::new();
     430           12 : 
     431           12 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     432           12 :             .await
     433           12 :             .unwrap();
     434           12 : 
     435           12 :         let mut closing = tokio::task::spawn(async move {
     436           12 :             gate.close().await;
     437           12 :         });
     438           12 : 
     439           12 :         // gate is entered until the ephemeral file is dropped
     440           12 :         // do not start paused tokio-epoll-uring has a sleep loop
     441           12 :         tokio::time::pause();
     442           12 :         tokio::time::timeout(FOREVER, &mut closing)
     443           12 :             .await
     444           12 :             .expect_err("closing cannot complete before dropping");
     445           12 : 
     446           12 :         // this is a requirement of the reset_tenant functionality: we have to be able to restart a
     447           12 :         // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
     448           12 :         drop(file);
     449           12 : 
     450           12 :         tokio::time::timeout(FOREVER, &mut closing)
     451           12 :             .await
     452           12 :             .expect("closing completes right away")
     453           12 :             .expect("closing does not panic");
     454           12 :     }
     455              : 
     456              :     #[tokio::test]
     457           12 :     async fn test_ephemeral_file_basics() {
     458           12 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
     459           12 : 
     460           12 :         let gate = utils::sync::gate::Gate::default();
     461           12 :         let cancel = CancellationToken::new();
     462           12 : 
     463           12 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     464           12 :             .await
     465           12 :             .unwrap();
     466           12 : 
     467           12 :         let mutable = file.buffered_writer.mutable();
     468           12 :         let cap = mutable.capacity();
     469           12 :         let align = mutable.align();
     470           12 : 
     471           12 :         let write_nbytes = cap * 2 + cap / 2;
     472           12 : 
     473           12 :         let content: Vec<u8> = rand::thread_rng()
     474           12 :             .sample_iter(rand::distributions::Standard)
     475           12 :             .take(write_nbytes)
     476           12 :             .collect();
     477           12 : 
     478           12 :         let mut value_offsets = Vec::new();
     479         3840 :         for range in (0..write_nbytes)
     480           12 :             .step_by(align)
     481         3840 :             .map(|start| start..(start + align).min(write_nbytes))
     482           12 :         {
     483         3840 :             let off = file.write_raw(&content[range], &ctx).await.unwrap();
     484         3840 :             value_offsets.push(off);
     485           12 :         }
     486           12 : 
     487           12 :         assert_eq!(file.len() as usize, write_nbytes);
     488         3840 :         for (i, range) in (0..write_nbytes)
     489           12 :             .step_by(align)
     490         3840 :             .map(|start| start..(start + align).min(write_nbytes))
     491           12 :             .enumerate()
     492           12 :         {
     493         3840 :             assert_eq!(value_offsets[i], range.start.into_u64());
     494         3840 :             let buf = IoBufferMut::with_capacity(range.len());
     495         3840 :             let (buf_slice, nread) = file
     496         3840 :                 .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
     497         3840 :                 .await
     498         3840 :                 .unwrap();
     499         3840 :             let buf = buf_slice.into_inner();
     500         3840 :             assert_eq!(nread, range.len());
     501         3840 :             assert_eq!(&buf, &content[range]);
     502           12 :         }
     503           12 : 
     504           12 :         let file_contents = std::fs::read(file.file.path()).unwrap();
     505           12 :         assert!(file_contents == content[0..cap * 2]);
     506           12 : 
     507           12 :         let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
     508           12 :         assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
     509           12 : 
     510           12 :         let mutable_buffer_contents = file.buffered_writer.mutable();
     511           12 :         assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
     512           12 :     }
     513              : 
     514              :     #[tokio::test]
     515           12 :     async fn test_flushes_do_happen() {
     516           12 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
     517           12 : 
     518           12 :         let gate = utils::sync::gate::Gate::default();
     519           12 :         let cancel = CancellationToken::new();
     520           12 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     521           12 :             .await
     522           12 :             .unwrap();
     523           12 : 
     524           12 :         // mutable buffer and maybe_flushed buffer each has `cap` bytes.
     525           12 :         let cap = file.buffered_writer.mutable().capacity();
     526           12 : 
     527           12 :         let content: Vec<u8> = rand::thread_rng()
     528           12 :             .sample_iter(rand::distributions::Standard)
     529           12 :             .take(cap * 2 + cap / 2)
     530           12 :             .collect();
     531           12 : 
     532           12 :         file.write_raw(&content, &ctx).await.unwrap();
     533           12 : 
     534           12 :         // assert the state is as this test expects it to be
     535           12 :         let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
     536           12 :         assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
     537           12 :         let md = file.file.path().metadata().unwrap();
     538           12 :         assert_eq!(
     539           12 :             md.len(),
     540           12 :             2 * cap.into_u64(),
     541           12 :             "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
     542           12 :         );
     543           12 :         assert_eq!(
     544           12 :             &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
     545           12 :             &content[cap..cap * 2]
     546           12 :         );
     547           12 :         assert_eq!(
     548           12 :             &file.buffered_writer.mutable()[0..cap / 2],
     549           12 :             &content[cap * 2..cap * 2 + cap / 2]
     550           12 :         );
     551           12 :     }
     552              : 
     553              :     #[tokio::test]
     554           12 :     async fn test_read_split_across_file_and_buffer() {
     555           12 :         // This test exercises the logic on the read path that splits the logical read
     556           12 :         // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
     557           12 :         //
     558           12 :         // This test build on the assertions in test_flushes_do_happen
     559           12 : 
     560           12 :         let (conf, tenant_id, timeline_id, ctx) =
     561           12 :             harness("test_read_split_across_file_and_buffer").unwrap();
     562           12 : 
     563           12 :         let gate = utils::sync::gate::Gate::default();
     564           12 :         let cancel = CancellationToken::new();
     565           12 : 
     566           12 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     567           12 :             .await
     568           12 :             .unwrap();
     569           12 : 
     570           12 :         let mutable = file.buffered_writer.mutable();
     571           12 :         let cap = mutable.capacity();
     572           12 :         let align = mutable.align();
     573           12 :         let content: Vec<u8> = rand::thread_rng()
     574           12 :             .sample_iter(rand::distributions::Standard)
     575           12 :             .take(cap * 2 + cap / 2)
     576           12 :             .collect();
     577           12 : 
     578           12 :         let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
     579           12 : 
     580          324 :         let test_read = |start: usize, len: usize| {
     581          324 :             let file = &file;
     582          324 :             let ctx = &ctx;
     583          324 :             let content = &content;
     584          324 :             async move {
     585          324 :                 let (buf, nread) = file
     586          324 :                     .read_exact_at_eof_ok(
     587          324 :                         start.into_u64(),
     588          324 :                         IoBufferMut::with_capacity(len).slice_full(),
     589          324 :                         ctx,
     590          324 :                     )
     591          324 :                     .await
     592          324 :                     .unwrap();
     593          324 :                 assert_eq!(nread, len);
     594          324 :                 assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
     595          324 :             }
     596          324 :         };
     597           12 : 
     598           36 :         let test_read_all_offset_combinations = || {
     599           36 :             async move {
     600           36 :                 test_read(align, align).await;
     601           12 :                 // border onto edge of file
     602           36 :                 test_read(cap - align, align).await;
     603           12 :                 // read across file and buffer
     604           36 :                 test_read(cap - align, 2 * align).await;
     605           12 :                 // stay from start of maybe flushed buffer
     606           36 :                 test_read(cap, align).await;
     607           12 :                 // completely within maybe flushed buffer
     608           36 :                 test_read(cap + align, align).await;
     609           12 :                 // border onto edge of maybe flushed buffer.
     610           36 :                 test_read(cap * 2 - align, align).await;
     611           12 :                 // read across maybe flushed and mutable buffer
     612           36 :                 test_read(cap * 2 - align, 2 * align).await;
     613           12 :                 // read across three segments
     614           36 :                 test_read(cap - align, cap + 2 * align).await;
     615           12 :                 // completely within mutable buffer
     616           36 :                 test_read(cap * 2 + align, align).await;
     617           36 :             }
     618           36 :         };
     619           12 : 
     620           12 :         // completely within the file range
     621           12 :         assert!(align < cap, "test assumption");
     622           12 :         assert!(cap % align == 0);
     623           12 : 
     624           12 :         // test reads at different flush stages.
     625           12 :         let not_started = control.unwrap().into_not_started();
     626           12 :         test_read_all_offset_combinations().await;
     627           12 :         let in_progress = not_started.ready_to_flush();
     628           12 :         test_read_all_offset_combinations().await;
     629           12 :         in_progress.wait_until_flush_is_done().await;
     630           12 :         test_read_all_offset_combinations().await;
     631           12 :     }
     632              : }
        

Generated by: LCOV version 2.1-beta