LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 94.5 % 400 378
Test Date: 2025-07-16 12:29:03 Functions: 89.2 % 37 33

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use std::io;
       5              : use std::sync::Arc;
       6              : use std::sync::atomic::{AtomicU64, Ordering};
       7              : 
       8              : use camino::Utf8PathBuf;
       9              : use num_traits::Num;
      10              : use pageserver_api::shard::TenantShardId;
      11              : use tokio_epoll_uring::{BoundedBuf, Slice};
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{error, info_span};
      14              : use utils::id::TimelineId;
      15              : use utils::sync::gate::GateGuard;
      16              : 
      17              : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
      18              : use crate::config::PageServerConf;
      19              : use crate::context::RequestContext;
      20              : use crate::page_cache;
      21              : use crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits;
      22              : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
      23              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
      24              : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
      25              : use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
      26              : use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
      27              : 
      28              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      29              : 
      30              : pub struct EphemeralFile {
      31              :     _tenant_shard_id: TenantShardId,
      32              :     _timeline_id: TimelineId,
      33              :     page_cache_file_id: page_cache::FileId,
      34              :     file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
      35              : 
      36              :     buffered_writer: tokio::sync::RwLock<BufferedWriter>,
      37              : 
      38              :     bytes_written: AtomicU64,
      39              : 
      40              :     resource_units: std::sync::Mutex<GlobalResourceUnits>,
      41              : }
      42              : 
      43              : type BufferedWriter = owned_buffers_io::write::BufferedWriter<
      44              :     IoBufferMut,
      45              :     TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
      46              : >;
      47              : 
      48              : /// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
      49              : ///
      50              : /// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
      51              : /// The co-ownership is between [`EphemeralFile`] and that flush task.)
      52              : ///
      53              : /// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
      54              : #[derive(Debug, Clone)]
      55              : struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
      56              :     inner: Arc<TempVirtualFile>,
      57              : }
      58              : 
      59              : const TAIL_SZ: usize = 64 * 1024;
      60              : 
      61              : impl EphemeralFile {
      62          669 :     pub async fn create(
      63          669 :         conf: &PageServerConf,
      64          669 :         tenant_shard_id: TenantShardId,
      65          669 :         timeline_id: TimelineId,
      66          669 :         gate: &utils::sync::gate::Gate,
      67          669 :         cancel: &CancellationToken,
      68          669 :         ctx: &RequestContext,
      69          669 :     ) -> anyhow::Result<EphemeralFile> {
      70              :         // TempVirtualFile requires us to never reuse a filename while an old
      71              :         // instance of TempVirtualFile created with that filename is not done dropping yet.
      72              :         // So, we use a monotonic counter to disambiguate the filenames.
      73              :         static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
      74          669 :         let filename_disambiguator =
      75          669 :             NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      76              : 
      77          669 :         let filename = conf
      78          669 :             .timeline_path(&tenant_shard_id, &timeline_id)
      79          669 :             .join(Utf8PathBuf::from(format!(
      80          669 :                 "ephemeral-{filename_disambiguator}"
      81          669 :             )));
      82              : 
      83          669 :         let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
      84          669 :             VirtualFile::open_with_options_v2(
      85          669 :                 &filename,
      86          669 :                 virtual_file::OpenOptions::new()
      87          669 :                     .create_new(true)
      88          669 :                     .read(true)
      89          669 :                     .write(true),
      90          669 :                 ctx,
      91          669 :             )
      92          669 :             .await?,
      93          669 :             gate.enter()?,
      94              :         );
      95              : 
      96          669 :         let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
      97              : 
      98              :         Ok(EphemeralFile {
      99          669 :             _tenant_shard_id: tenant_shard_id,
     100          669 :             _timeline_id: timeline_id,
     101          669 :             page_cache_file_id,
     102          669 :             file: file.clone(),
     103          669 :             buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new(
     104          669 :                 file,
     105              :                 0,
     106         1338 :                 || IoBufferMut::with_capacity(TAIL_SZ),
     107          669 :                 gate.enter()?,
     108          669 :                 cancel.child_token(),
     109          669 :                 ctx,
     110          669 :                 info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
     111              :             )),
     112          669 :             bytes_written: AtomicU64::new(0),
     113          669 :             resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()),
     114              :         })
     115          669 :     }
     116              : }
     117              : 
     118              : impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
     119          669 :     fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
     120          669 :         Self {
     121          669 :             inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
     122          669 :         }
     123          669 :     }
     124              : }
     125              : 
     126              : impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
     127         3320 :     fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
     128         3320 :         &self,
     129         3320 :         buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
     130         3320 :         offset: u64,
     131         3320 :         ctx: &RequestContext,
     132         3320 :     ) -> impl std::future::Future<
     133         3320 :         Output = (
     134         3320 :             owned_buffers_io::io_buf_ext::FullSlice<Buf>,
     135         3320 :             std::io::Result<()>,
     136         3320 :         ),
     137         3320 :     > + Send {
     138         3320 :         self.inner.write_all_at(buf, offset, ctx)
     139         3320 :     }
     140              : 
     141            0 :     fn set_len(
     142            0 :         &self,
     143            0 :         len: u64,
     144            0 :         ctx: &RequestContext,
     145            0 :     ) -> impl Future<Output = std::io::Result<()>> + Send {
     146            0 :         self.inner.set_len(len, ctx)
     147            0 :     }
     148              : }
     149              : 
     150              : impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
     151              :     type Target = VirtualFile;
     152              : 
     153        18647 :     fn deref(&self) -> &Self::Target {
     154        18647 :         &self.inner
     155        18647 :     }
     156              : }
     157              : 
     158              : #[derive(Debug, thiserror::Error)]
     159              : pub(crate) enum EphemeralFileWriteError {
     160              :     #[error("cancelled")]
     161              :     Cancelled,
     162              : }
     163              : 
     164              : impl EphemeralFile {
     165      4806605 :     pub(crate) fn len(&self) -> u64 {
     166              :         // TODO(vlad): The value returned here is not always correct if
     167              :         // we have more than one concurrent writer. Writes are always
     168              :         // sequenced, but we could grab the buffered writer lock if we wanted
     169              :         // to.
     170      4806605 :         self.bytes_written.load(Ordering::Acquire)
     171      4806605 :     }
     172              : 
     173          665 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
     174          665 :         self.page_cache_file_id
     175          665 :     }
     176              : 
     177          487 :     pub(crate) async fn load_to_io_buf(
     178          487 :         &self,
     179          487 :         ctx: &RequestContext,
     180          487 :     ) -> Result<IoBufferMut, io::Error> {
     181          487 :         let size = self.len().into_usize();
     182          487 :         let buf = IoBufferMut::with_capacity(size);
     183          487 :         let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
     184          487 :         assert_eq!(nread, size);
     185          487 :         let buf = slice.into_inner();
     186          487 :         assert_eq!(buf.len(), nread);
     187          487 :         assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
     188          487 :         Ok(buf)
     189          487 :     }
     190              : 
     191              :     /// Returns the offset at which the first byte of the input was written, for use
     192              :     /// in constructing indices over the written value.
     193              :     ///
     194              :     /// Panics if the write is short because there's no way we can recover from that.
     195              :     /// TODO: make upstack handle this as an error.
     196      2402453 :     pub(crate) async fn write_raw(
     197      2402453 :         &self,
     198      2402453 :         srcbuf: &[u8],
     199      2402453 :         ctx: &RequestContext,
     200      2402453 :     ) -> Result<u64, EphemeralFileWriteError> {
     201      2402453 :         let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
     202      2402453 :         if let Some(control) = control {
     203         2769 :             control.release().await;
     204      2399684 :         }
     205      2402453 :         Ok(pos)
     206      2402453 :     }
     207              : 
     208      2402454 :     async fn write_raw_controlled(
     209      2402454 :         &self,
     210      2402454 :         srcbuf: &[u8],
     211      2402454 :         ctx: &RequestContext,
     212      2402454 :     ) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
     213      2402454 :         let mut writer = self.buffered_writer.write().await;
     214              : 
     215      2402454 :         let (nwritten, control) = writer
     216      2402454 :             .write_buffered_borrowed_controlled(srcbuf, ctx)
     217      2402454 :             .await
     218      2402454 :             .map_err(|e| match e {
     219            0 :                 FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
     220            0 :             })?;
     221      2402454 :         assert_eq!(
     222              :             nwritten,
     223      2402454 :             srcbuf.len(),
     224            0 :             "buffered writer has no short writes"
     225              :         );
     226              : 
     227              :         // There's no realistic risk of overflow here. We won't have exabytes sized files on disk.
     228      2402454 :         let pos = self
     229      2402454 :             .bytes_written
     230      2402454 :             .fetch_add(srcbuf.len().into_u64(), Ordering::AcqRel);
     231              : 
     232      2402454 :         let mut resource_units = self.resource_units.lock().unwrap();
     233      2402454 :         resource_units.maybe_publish_size(self.bytes_written.load(Ordering::Relaxed));
     234              : 
     235      2402454 :         Ok((pos, control))
     236      2402454 :     }
     237              : 
     238            0 :     pub(crate) fn tick(&self) -> Option<u64> {
     239            0 :         let mut resource_units = self.resource_units.lock().unwrap();
     240            0 :         let len = self.bytes_written.load(Ordering::Relaxed);
     241            0 :         resource_units.publish_size(len)
     242            0 :     }
     243              : }
     244              : 
     245              : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
     246       262460 :     async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
     247       262460 :         &self,
     248       262460 :         start: u64,
     249       262460 :         mut dst: tokio_epoll_uring::Slice<B>,
     250       262460 :         ctx: &RequestContext,
     251       262460 :     ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
     252              :         // We will fill the slice in back to front. Hence, we need
     253              :         // the slice to be fully initialized.
     254              :         // TODO(vlad): Is there a nicer way of doing this?
     255       262460 :         dst.as_mut_rust_slice_full_zeroed();
     256              : 
     257       262460 :         let writer = self.buffered_writer.read().await;
     258              : 
     259              :         // Read bytes written while under lock. This is a hack to deal with concurrent
     260              :         // writes updating the number of bytes written. `bytes_written` is not DIO alligned
     261              :         // but we may end the read there.
     262              :         //
     263              :         // TODO(vlad): Feels like there's a nicer path where we align the end if it
     264              :         // shoots over the end of the file.
     265       262460 :         let bytes_written = self.bytes_written.load(Ordering::Acquire);
     266              : 
     267       262460 :         let dst_cap = dst.bytes_total().into_u64();
     268       262460 :         let end = {
     269              :             // saturating_add is correct here because the max file size is u64::MAX, so,
     270              :             // if start + dst.len() > u64::MAX, then we know it will be a short read
     271       262460 :             let mut end: u64 = start.saturating_add(dst_cap);
     272       262460 :             if end > bytes_written {
     273       136159 :                 end = bytes_written;
     274       136159 :             }
     275       262460 :             end
     276              :         };
     277              : 
     278       262460 :         let submitted_offset = writer.bytes_submitted();
     279       262460 :         let maybe_flushed = writer.inspect_maybe_flushed();
     280              : 
     281       262460 :         let mutable = match writer.inspect_mutable() {
     282       262460 :             Some(mutable) => &mutable[0..mutable.pending()],
     283              :             None => {
     284              :                 // Timeline::cancel and hence buffered writer flush was cancelled.
     285              :                 // Remain read-available while timeline is shutting down.
     286            0 :                 &[]
     287              :             }
     288              :         };
     289              : 
     290              :         // inclusive, exclusive
     291              :         #[derive(Debug)]
     292              :         struct Range<N>(N, N);
     293              :         impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
     294      1703392 :             fn len(&self) -> N {
     295      1703392 :                 if self.0 > self.1 {
     296       910616 :                     N::zero()
     297              :                 } else {
     298       792776 :                     self.1 - self.0
     299              :                 }
     300      1703392 :             }
     301              :         }
     302              : 
     303       262460 :         let (written_range, maybe_flushed_range) = {
     304       262460 :             if maybe_flushed.is_some() {
     305              :                 // [       written       ][ maybe_flushed ][    mutable    ]
     306              :                 //                                         ^
     307              :                 //                                 `submitted_offset`
     308              :                 // <++++++ on disk +++++++????????????????>
     309       257206 :                 (
     310       257206 :                     Range(
     311       257206 :                         start,
     312       257206 :                         std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     313       257206 :                     ),
     314       257206 :                     Range(
     315       257206 :                         std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     316       257206 :                         std::cmp::min(end, submitted_offset),
     317       257206 :                     ),
     318       257206 :                 )
     319              :             } else {
     320              :                 // [       written                        ][    mutable    ]
     321              :                 //                                         ^
     322              :                 //                                 `submitted_offset`
     323              :                 // <++++++ on disk +++++++++++++++++++++++>
     324         5254 :                 (
     325         5254 :                     Range(start, std::cmp::min(end, submitted_offset)),
     326         5254 :                     // zero len
     327         5254 :                     Range(submitted_offset, u64::MIN),
     328         5254 :                 )
     329              :             }
     330              :         };
     331              : 
     332       262460 :         let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
     333              : 
     334              :         // There are three sources from which we might have to read data:
     335              :         // 1. The file itself
     336              :         // 2. The buffer which contains changes currently being flushed
     337              :         // 3. The buffer which contains chnages yet to be flushed
     338              :         //
     339              :         // For better concurrency, we do them in reverse order: perform the in-memory
     340              :         // reads while holding the writer lock, drop the writer lock and read from the
     341              :         // file if required.
     342              : 
     343       262460 :         let dst = if mutable_range.len() > 0 {
     344       162997 :             let offset_in_buffer = mutable_range
     345       162997 :                 .0
     346       162997 :                 .checked_sub(submitted_offset)
     347       162997 :                 .unwrap()
     348       162997 :                 .into_usize();
     349       162997 :             let to_copy =
     350       162997 :                 &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
     351       162997 :             let bounds = dst.bounds();
     352       162997 :             let mut view = dst.slice({
     353       162997 :                 let start =
     354       162997 :                     written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
     355       162997 :                 let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
     356       162997 :                 start..end
     357              :             });
     358       162997 :             view.as_mut_rust_slice_full_zeroed()
     359       162997 :                 .copy_from_slice(to_copy);
     360       162997 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     361              :         } else {
     362        99463 :             dst
     363              :         };
     364              : 
     365       262460 :         let dst = if maybe_flushed_range.len() > 0 {
     366        81793 :             let offset_in_buffer = maybe_flushed_range
     367        81793 :                 .0
     368        81793 :                 .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
     369        81793 :                 .unwrap()
     370        81793 :                 .into_usize();
     371              :             // Checked previously the buffer is Some.
     372        81793 :             let maybe_flushed = maybe_flushed.unwrap();
     373        81793 :             let to_copy = &maybe_flushed
     374        81793 :                 [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
     375        81793 :             let bounds = dst.bounds();
     376        81793 :             let mut view = dst.slice({
     377        81793 :                 let start = written_range.len().into_usize();
     378        81793 :                 let end = start
     379        81793 :                     .checked_add(maybe_flushed_range.len().into_usize())
     380        81793 :                     .unwrap();
     381        81793 :                 start..end
     382              :             });
     383        81793 :             view.as_mut_rust_slice_full_zeroed()
     384        81793 :                 .copy_from_slice(to_copy);
     385        81793 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     386              :         } else {
     387       180667 :             dst
     388              :         };
     389              : 
     390       262460 :         drop(writer);
     391              : 
     392       262460 :         let dst = if written_range.len() > 0 {
     393        18645 :             let bounds = dst.bounds();
     394        18645 :             let slice = self
     395        18645 :                 .file
     396        18645 :                 .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
     397        18645 :                 .await?;
     398        18645 :             Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
     399              :         } else {
     400       243815 :             dst
     401              :         };
     402              : 
     403              :         // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
     404              : 
     405       262460 :         Ok((dst, (end - start).into_usize()))
     406       262460 :     }
     407              : }
     408              : 
     409              : /// Does the given filename look like an ephemeral file?
     410            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     411            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     412            0 :         rest.parse::<u32>().is_ok()
     413              :     } else {
     414            0 :         false
     415              :     }
     416            0 : }
     417              : 
     418              : #[cfg(test)]
     419              : mod tests {
     420              :     use std::fs;
     421              :     use std::str::FromStr;
     422              : 
     423              :     use rand::Rng;
     424              : 
     425              :     use super::*;
     426              :     use crate::context::DownloadBehavior;
     427              :     use crate::task_mgr::TaskKind;
     428              : 
     429            4 :     fn harness(
     430            4 :         test_name: &str,
     431            4 :     ) -> Result<
     432            4 :         (
     433            4 :             &'static PageServerConf,
     434            4 :             TenantShardId,
     435            4 :             TimelineId,
     436            4 :             RequestContext,
     437            4 :         ),
     438            4 :         io::Error,
     439            4 :     > {
     440            4 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     441            4 :         let _ = fs::remove_dir_all(&repo_dir);
     442            4 :         let conf = PageServerConf::dummy_conf(repo_dir);
     443              :         // Make a static copy of the config. This can never be free'd, but that's
     444              :         // OK in a test.
     445            4 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     446              : 
     447            4 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     448            4 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     449            4 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     450              : 
     451            4 :         let ctx =
     452            4 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     453              : 
     454            4 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     455            4 :     }
     456              : 
     457              :     #[tokio::test]
     458            1 :     async fn ephemeral_file_holds_gate_open() {
     459              :         const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
     460              : 
     461            1 :         let (conf, tenant_id, timeline_id, ctx) =
     462            1 :             harness("ephemeral_file_holds_gate_open").unwrap();
     463              : 
     464            1 :         let gate = utils::sync::gate::Gate::default();
     465            1 :         let cancel = CancellationToken::new();
     466              : 
     467            1 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     468            1 :             .await
     469            1 :             .unwrap();
     470              : 
     471            1 :         let mut closing = tokio::task::spawn(async move {
     472            1 :             gate.close().await;
     473            1 :         });
     474              : 
     475              :         // gate is entered until the ephemeral file is dropped
     476              :         // do not start paused tokio-epoll-uring has a sleep loop
     477            1 :         tokio::time::pause();
     478            1 :         tokio::time::timeout(FOREVER, &mut closing)
     479            1 :             .await
     480            1 :             .expect_err("closing cannot complete before dropping");
     481              : 
     482              :         // this is a requirement of the reset_tenant functionality: we have to be able to restart a
     483              :         // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
     484            1 :         drop(file);
     485              : 
     486            1 :         tokio::time::timeout(FOREVER, &mut closing)
     487            1 :             .await
     488            1 :             .expect("closing completes right away")
     489            1 :             .expect("closing does not panic");
     490            1 :     }
     491              : 
     492              :     #[tokio::test]
     493            1 :     async fn test_ephemeral_file_basics() {
     494            1 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
     495              : 
     496            1 :         let gate = utils::sync::gate::Gate::default();
     497            1 :         let cancel = CancellationToken::new();
     498              : 
     499            1 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     500            1 :             .await
     501            1 :             .unwrap();
     502              : 
     503            1 :         let writer = file.buffered_writer.read().await;
     504            1 :         let mutable = writer.mutable();
     505            1 :         let cap = mutable.capacity();
     506            1 :         let align = mutable.align();
     507            1 :         drop(writer);
     508              : 
     509            1 :         let write_nbytes = cap * 2 + cap / 2;
     510              : 
     511            1 :         let content: Vec<u8> = rand::thread_rng()
     512            1 :             .sample_iter(rand::distributions::Standard)
     513            1 :             .take(write_nbytes)
     514            1 :             .collect();
     515              : 
     516            1 :         let mut value_offsets = Vec::new();
     517          320 :         for range in (0..write_nbytes)
     518            1 :             .step_by(align)
     519          320 :             .map(|start| start..(start + align).min(write_nbytes))
     520              :         {
     521          320 :             let off = file.write_raw(&content[range], &ctx).await.unwrap();
     522          320 :             value_offsets.push(off);
     523              :         }
     524              : 
     525            1 :         assert_eq!(file.len() as usize, write_nbytes);
     526          320 :         for (i, range) in (0..write_nbytes)
     527            1 :             .step_by(align)
     528          320 :             .map(|start| start..(start + align).min(write_nbytes))
     529            1 :             .enumerate()
     530              :         {
     531          320 :             assert_eq!(value_offsets[i], range.start.into_u64());
     532          320 :             let buf = IoBufferMut::with_capacity(range.len());
     533          320 :             let (buf_slice, nread) = file
     534          320 :                 .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
     535          320 :                 .await
     536          320 :                 .unwrap();
     537          320 :             let buf = buf_slice.into_inner();
     538          320 :             assert_eq!(nread, range.len());
     539          320 :             assert_eq!(&buf, &content[range]);
     540              :         }
     541              : 
     542            1 :         let file_contents = std::fs::read(file.file.path()).unwrap();
     543            1 :         assert!(file_contents == content[0..cap * 2]);
     544              : 
     545            1 :         let writer = file.buffered_writer.read().await;
     546            1 :         let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap();
     547            1 :         assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
     548              : 
     549            1 :         let mutable_buffer_contents = writer.mutable();
     550            1 :         assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
     551            1 :     }
     552              : 
     553              :     #[tokio::test]
     554            1 :     async fn test_flushes_do_happen() {
     555            1 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
     556              : 
     557            1 :         let gate = utils::sync::gate::Gate::default();
     558            1 :         let cancel = CancellationToken::new();
     559            1 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     560            1 :             .await
     561            1 :             .unwrap();
     562              : 
     563              :         // mutable buffer and maybe_flushed buffer each has `cap` bytes.
     564            1 :         let writer = file.buffered_writer.read().await;
     565            1 :         let cap = writer.mutable().capacity();
     566            1 :         drop(writer);
     567              : 
     568            1 :         let content: Vec<u8> = rand::thread_rng()
     569            1 :             .sample_iter(rand::distributions::Standard)
     570            1 :             .take(cap * 2 + cap / 2)
     571            1 :             .collect();
     572              : 
     573            1 :         file.write_raw(&content, &ctx).await.unwrap();
     574              : 
     575              :         // assert the state is as this test expects it to be
     576            1 :         let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
     577            1 :         assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
     578            1 :         let md = file.file.path().metadata().unwrap();
     579            1 :         assert_eq!(
     580            1 :             md.len(),
     581            1 :             2 * cap.into_u64(),
     582            0 :             "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
     583              :         );
     584            1 :         let writer = file.buffered_writer.read().await;
     585            1 :         assert_eq!(
     586            1 :             &writer.inspect_maybe_flushed().unwrap()[0..cap],
     587            1 :             &content[cap..cap * 2]
     588              :         );
     589            1 :         assert_eq!(
     590            1 :             &writer.mutable()[0..cap / 2],
     591            1 :             &content[cap * 2..cap * 2 + cap / 2]
     592            1 :         );
     593            1 :     }
     594              : 
     595              :     #[tokio::test]
     596            1 :     async fn test_read_split_across_file_and_buffer() {
     597              :         // This test exercises the logic on the read path that splits the logical read
     598              :         // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
     599              :         //
     600              :         // This test build on the assertions in test_flushes_do_happen
     601              : 
     602            1 :         let (conf, tenant_id, timeline_id, ctx) =
     603            1 :             harness("test_read_split_across_file_and_buffer").unwrap();
     604              : 
     605            1 :         let gate = utils::sync::gate::Gate::default();
     606            1 :         let cancel = CancellationToken::new();
     607              : 
     608            1 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     609            1 :             .await
     610            1 :             .unwrap();
     611              : 
     612            1 :         let writer = file.buffered_writer.read().await;
     613            1 :         let mutable = writer.mutable();
     614            1 :         let cap = mutable.capacity();
     615            1 :         let align = mutable.align();
     616            1 :         drop(writer);
     617            1 :         let content: Vec<u8> = rand::thread_rng()
     618            1 :             .sample_iter(rand::distributions::Standard)
     619            1 :             .take(cap * 2 + cap / 2)
     620            1 :             .collect();
     621              : 
     622            1 :         let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
     623              : 
     624           27 :         let test_read = |start: usize, len: usize| {
     625           27 :             let file = &file;
     626           27 :             let ctx = &ctx;
     627           27 :             let content = &content;
     628           27 :             async move {
     629           27 :                 let (buf, nread) = file
     630           27 :                     .read_exact_at_eof_ok(
     631           27 :                         start.into_u64(),
     632           27 :                         IoBufferMut::with_capacity(len).slice_full(),
     633           27 :                         ctx,
     634           27 :                     )
     635           27 :                     .await
     636           27 :                     .unwrap();
     637           27 :                 assert_eq!(nread, len);
     638           27 :                 assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
     639           27 :             }
     640           27 :         };
     641              : 
     642            3 :         let test_read_all_offset_combinations = || {
     643            3 :             async move {
     644            3 :                 test_read(align, align).await;
     645              :                 // border onto edge of file
     646            3 :                 test_read(cap - align, align).await;
     647              :                 // read across file and buffer
     648            3 :                 test_read(cap - align, 2 * align).await;
     649              :                 // stay from start of maybe flushed buffer
     650            3 :                 test_read(cap, align).await;
     651              :                 // completely within maybe flushed buffer
     652            3 :                 test_read(cap + align, align).await;
     653              :                 // border onto edge of maybe flushed buffer.
     654            3 :                 test_read(cap * 2 - align, align).await;
     655              :                 // read across maybe flushed and mutable buffer
     656            3 :                 test_read(cap * 2 - align, 2 * align).await;
     657              :                 // read across three segments
     658            3 :                 test_read(cap - align, cap + 2 * align).await;
     659              :                 // completely within mutable buffer
     660            3 :                 test_read(cap * 2 + align, align).await;
     661            3 :             }
     662            3 :         };
     663              : 
     664              :         // completely within the file range
     665            1 :         assert!(align < cap, "test assumption");
     666            1 :         assert!(cap % align == 0);
     667              : 
     668              :         // test reads at different flush stages.
     669            1 :         let not_started = control.unwrap().into_not_started();
     670            1 :         test_read_all_offset_combinations().await;
     671            1 :         let in_progress = not_started.ready_to_flush();
     672            1 :         test_read_all_offset_combinations().await;
     673            1 :         in_progress.wait_until_flush_is_done().await;
     674            1 :         test_read_all_offset_combinations().await;
     675            1 :     }
     676              : }
        

Generated by: LCOV version 2.1-beta