LCOV - code coverage report
Current view: top level - pageserver/src/tenant - ephemeral_file.rs (source / functions) Coverage Total Hit
Test: 13fa4b48c3603751d5b1568465c493b8925758a2.info Lines: 97.0 % 438 425
Test Date: 2025-03-19 18:46:26 Functions: 91.2 % 34 31

            Line data    Source code
       1              : //! Implementation of append-only file data structure
       2              : //! used to keep in-memory layers spilled on disk.
       3              : 
       4              : use std::io;
       5              : use std::sync::Arc;
       6              : use std::sync::atomic::AtomicU64;
       7              : 
       8              : use camino::Utf8PathBuf;
       9              : use num_traits::Num;
      10              : use pageserver_api::shard::TenantShardId;
      11              : use tokio_epoll_uring::{BoundedBuf, Slice};
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{error, info_span};
      14              : use utils::id::TimelineId;
      15              : 
      16              : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
      17              : use crate::config::PageServerConf;
      18              : use crate::context::RequestContext;
      19              : use crate::page_cache;
      20              : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
      21              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
      22              : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
      23              : use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
      24              : use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
      25              : 
      26              : pub struct EphemeralFile {
      27              :     _tenant_shard_id: TenantShardId,
      28              :     _timeline_id: TimelineId,
      29              :     page_cache_file_id: page_cache::FileId,
      30              :     bytes_written: u64,
      31              :     buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
      32              :     /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
      33              :     _gate_guard: utils::sync::gate::GateGuard,
      34              : }
      35              : 
      36              : const TAIL_SZ: usize = 64 * 1024;
      37              : 
      38              : impl EphemeralFile {
      39         2632 :     pub async fn create(
      40         2632 :         conf: &PageServerConf,
      41         2632 :         tenant_shard_id: TenantShardId,
      42         2632 :         timeline_id: TimelineId,
      43         2632 :         gate: &utils::sync::gate::Gate,
      44         2632 :         cancel: &CancellationToken,
      45         2632 :         ctx: &RequestContext,
      46         2632 :     ) -> anyhow::Result<EphemeralFile> {
      47              :         static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
      48         2632 :         let filename_disambiguator =
      49         2632 :             NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      50         2632 : 
      51         2632 :         let filename = conf
      52         2632 :             .timeline_path(&tenant_shard_id, &timeline_id)
      53         2632 :             .join(Utf8PathBuf::from(format!(
      54         2632 :                 "ephemeral-{filename_disambiguator}"
      55         2632 :             )));
      56              : 
      57         2632 :         let file = Arc::new(
      58         2632 :             VirtualFile::open_with_options_v2(
      59         2632 :                 &filename,
      60         2632 :                 virtual_file::OpenOptions::new()
      61         2632 :                     .read(true)
      62         2632 :                     .write(true)
      63         2632 :                     .create(true),
      64         2632 :                 ctx,
      65         2632 :             )
      66         2632 :             .await?,
      67              :         );
      68              : 
      69         2632 :         let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
      70         2632 : 
      71         2632 :         Ok(EphemeralFile {
      72         2632 :             _tenant_shard_id: tenant_shard_id,
      73         2632 :             _timeline_id: timeline_id,
      74         2632 :             page_cache_file_id,
      75         2632 :             bytes_written: 0,
      76         2632 :             buffered_writer: owned_buffers_io::write::BufferedWriter::new(
      77         2632 :                 file,
      78         5264 :                 || IoBufferMut::with_capacity(TAIL_SZ),
      79         2632 :                 gate.enter()?,
      80         2632 :                 cancel.child_token(),
      81         2632 :                 ctx,
      82         2632 :                 info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
      83              :             ),
      84         2632 :             _gate_guard: gate.enter()?,
      85              :         })
      86         2632 :     }
      87              : }
      88              : 
      89              : impl Drop for EphemeralFile {
      90         2380 :     fn drop(&mut self) {
      91         2380 :         // unlink the file
      92         2380 :         // we are clear to do this, because we have entered a gate
      93         2380 :         let path = self.buffered_writer.as_inner().path();
      94         2380 :         let res = std::fs::remove_file(path);
      95         2380 :         if let Err(e) = res {
      96            4 :             if e.kind() != std::io::ErrorKind::NotFound {
      97              :                 // just never log the not found errors, we cannot do anything for them; on detach
      98              :                 // the tenant directory is already gone.
      99              :                 //
     100              :                 // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
     101            0 :                 error!("could not remove ephemeral file '{path}': {e}");
     102            4 :             }
     103         2376 :         }
     104         2380 :     }
     105              : }
     106              : 
     107              : #[derive(Debug, thiserror::Error)]
     108              : pub(crate) enum EphemeralFileWriteError {
     109              :     #[error("{0}")]
     110              :     TooLong(String),
     111              :     #[error("cancelled")]
     112              :     Cancelled,
     113              : }
     114              : 
     115              : impl EphemeralFile {
     116     19226220 :     pub(crate) fn len(&self) -> u64 {
     117     19226220 :         self.bytes_written
     118     19226220 :     }
     119              : 
     120         2616 :     pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
     121         2616 :         self.page_cache_file_id
     122         2616 :     }
     123              : 
     124         1940 :     pub(crate) async fn load_to_io_buf(
     125         1940 :         &self,
     126         1940 :         ctx: &RequestContext,
     127         1940 :     ) -> Result<IoBufferMut, io::Error> {
     128         1940 :         let size = self.len().into_usize();
     129         1940 :         let buf = IoBufferMut::with_capacity(size);
     130         1940 :         let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
     131         1940 :         assert_eq!(nread, size);
     132         1940 :         let buf = slice.into_inner();
     133         1940 :         assert_eq!(buf.len(), nread);
     134         1940 :         assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
     135         1940 :         Ok(buf)
     136         1940 :     }
     137              : 
     138              :     /// Returns the offset at which the first byte of the input was written, for use
     139              :     /// in constructing indices over the written value.
     140              :     ///
     141              :     /// Panics if the write is short because there's no way we can recover from that.
     142              :     /// TODO: make upstack handle this as an error.
     143      9609764 :     pub(crate) async fn write_raw(
     144      9609764 :         &mut self,
     145      9609764 :         srcbuf: &[u8],
     146      9609764 :         ctx: &RequestContext,
     147      9609764 :     ) -> Result<u64, EphemeralFileWriteError> {
     148      9609764 :         let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
     149      9609764 :         if let Some(control) = control {
     150        11068 :             control.release().await;
     151      9598696 :         }
     152      9609764 :         Ok(pos)
     153      9609764 :     }
     154              : 
     155      9609768 :     async fn write_raw_controlled(
     156      9609768 :         &mut self,
     157      9609768 :         srcbuf: &[u8],
     158      9609768 :         ctx: &RequestContext,
     159      9609768 :     ) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
     160      9609768 :         let pos = self.bytes_written;
     161              : 
     162      9609768 :         let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
     163            0 :             EphemeralFileWriteError::TooLong(format!(
     164            0 :                 "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
     165            0 :                 srcbuf_len = srcbuf.len(),
     166            0 :             ))
     167      9609768 :         })?;
     168              : 
     169              :         // Write the payload
     170      9609768 :         let (nwritten, control) = self
     171      9609768 :             .buffered_writer
     172      9609768 :             .write_buffered_borrowed_controlled(srcbuf, ctx)
     173      9609768 :             .await
     174      9609768 :             .map_err(|e| match e {
     175            0 :                 FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
     176      9609768 :             })?;
     177      9609768 :         assert_eq!(
     178      9609768 :             nwritten,
     179      9609768 :             srcbuf.len(),
     180            0 :             "buffered writer has no short writes"
     181              :         );
     182              : 
     183      9609768 :         self.bytes_written = new_bytes_written;
     184      9609768 : 
     185      9609768 :         Ok((pos, control))
     186      9609768 :     }
     187              : }
     188              : 
     189              : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
     190       997507 :     async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
     191       997507 :         &self,
     192       997507 :         start: u64,
     193       997507 :         dst: tokio_epoll_uring::Slice<B>,
     194       997507 :         ctx: &RequestContext,
     195       997507 :     ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
     196       997507 :         let submitted_offset = self.buffered_writer.bytes_submitted();
     197              : 
     198       997507 :         let mutable = match self.buffered_writer.inspect_mutable() {
     199       997507 :             Some(mutable) => &mutable[0..mutable.pending()],
     200              :             None => {
     201              :                 // Timeline::cancel and hence buffered writer flush was cancelled.
     202              :                 // Remain read-available while timeline is shutting down.
     203            0 :                 &[]
     204              :             }
     205              :         };
     206              : 
     207       997507 :         let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
     208       997507 : 
     209       997507 :         let dst_cap = dst.bytes_total().into_u64();
     210       997507 :         let end = {
     211              :             // saturating_add is correct here because the max file size is u64::MAX, so,
     212              :             // if start + dst.len() > u64::MAX, then we know it will be a short read
     213       997507 :             let mut end: u64 = start.saturating_add(dst_cap);
     214       997507 :             if end > self.bytes_written {
     215       554384 :                 end = self.bytes_written;
     216       554384 :             }
     217       997507 :             end
     218              :         };
     219              : 
     220              :         // inclusive, exclusive
     221              :         #[derive(Debug)]
     222              :         struct Range<N>(N, N);
     223              :         impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
     224      6613125 :             fn len(&self) -> N {
     225      6613125 :                 if self.0 > self.1 {
     226      3549342 :                     N::zero()
     227              :                 } else {
     228      3063783 :                     self.1 - self.0
     229              :                 }
     230      6613125 :             }
     231              :         }
     232              : 
     233       997507 :         let (written_range, maybe_flushed_range) = {
     234       997507 :             if maybe_flushed.is_some() {
     235              :                 // [       written       ][ maybe_flushed ][    mutable    ]
     236              :                 //                                         ^
     237              :                 //                                 `submitted_offset`
     238              :                 // <++++++ on disk +++++++????????????????>
     239       976760 :                 (
     240       976760 :                     Range(
     241       976760 :                         start,
     242       976760 :                         std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     243       976760 :                     ),
     244       976760 :                     Range(
     245       976760 :                         std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
     246       976760 :                         std::cmp::min(end, submitted_offset),
     247       976760 :                     ),
     248       976760 :                 )
     249              :             } else {
     250              :                 // [       written                        ][    mutable    ]
     251              :                 //                                         ^
     252              :                 //                                 `submitted_offset`
     253              :                 // <++++++ on disk +++++++++++++++++++++++>
     254        20747 :                 (
     255        20747 :                     Range(start, std::cmp::min(end, submitted_offset)),
     256        20747 :                     // zero len
     257        20747 :                     Range(submitted_offset, u64::MIN),
     258        20747 :                 )
     259              :             }
     260              :         };
     261              : 
     262       997507 :         let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
     263              : 
     264       997507 :         let dst = if written_range.len() > 0 {
     265        20312 :             let file: &VirtualFile = self.buffered_writer.as_inner();
     266        20312 :             let bounds = dst.bounds();
     267        20312 :             let slice = file
     268        20312 :                 .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
     269        20312 :                 .await?;
     270        20312 :             Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
     271              :         } else {
     272       977195 :             dst
     273              :         };
     274              : 
     275       997507 :         let dst = if maybe_flushed_range.len() > 0 {
     276       320872 :             let offset_in_buffer = maybe_flushed_range
     277       320872 :                 .0
     278       320872 :                 .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
     279       320872 :                 .unwrap()
     280       320872 :                 .into_usize();
     281       320872 :             // Checked previously the buffer is Some.
     282       320872 :             let maybe_flushed = maybe_flushed.unwrap();
     283       320872 :             let to_copy = &maybe_flushed
     284       320872 :                 [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
     285       320872 :             let bounds = dst.bounds();
     286       320872 :             let mut view = dst.slice({
     287       320872 :                 let start = written_range.len().into_usize();
     288       320872 :                 let end = start
     289       320872 :                     .checked_add(maybe_flushed_range.len().into_usize())
     290       320872 :                     .unwrap();
     291       320872 :                 start..end
     292       320872 :             });
     293       320872 :             view.as_mut_rust_slice_full_zeroed()
     294       320872 :                 .copy_from_slice(to_copy);
     295       320872 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     296              :         } else {
     297       676635 :             dst
     298              :         };
     299              : 
     300       997507 :         let dst = if mutable_range.len() > 0 {
     301       659419 :             let offset_in_buffer = mutable_range
     302       659419 :                 .0
     303       659419 :                 .checked_sub(submitted_offset)
     304       659419 :                 .unwrap()
     305       659419 :                 .into_usize();
     306       659419 :             let to_copy =
     307       659419 :                 &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
     308       659419 :             let bounds = dst.bounds();
     309       659419 :             let mut view = dst.slice({
     310       659419 :                 let start =
     311       659419 :                     written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
     312       659419 :                 let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
     313       659419 :                 start..end
     314       659419 :             });
     315       659419 :             view.as_mut_rust_slice_full_zeroed()
     316       659419 :                 .copy_from_slice(to_copy);
     317       659419 :             Slice::from_buf_bounds(Slice::into_inner(view), bounds)
     318              :         } else {
     319       338088 :             dst
     320              :         };
     321              : 
     322              :         // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
     323              : 
     324       997507 :         Ok((dst, (end - start).into_usize()))
     325       997507 :     }
     326              : }
     327              : 
     328              : /// Does the given filename look like an ephemeral file?
     329            0 : pub fn is_ephemeral_file(filename: &str) -> bool {
     330            0 :     if let Some(rest) = filename.strip_prefix("ephemeral-") {
     331            0 :         rest.parse::<u32>().is_ok()
     332              :     } else {
     333            0 :         false
     334              :     }
     335            0 : }
     336              : 
     337              : #[cfg(test)]
     338              : mod tests {
     339              :     use std::fs;
     340              :     use std::str::FromStr;
     341              : 
     342              :     use rand::Rng;
     343              : 
     344              :     use super::*;
     345              :     use crate::context::DownloadBehavior;
     346              :     use crate::task_mgr::TaskKind;
     347              : 
     348           16 :     fn harness(
     349           16 :         test_name: &str,
     350           16 :     ) -> Result<
     351           16 :         (
     352           16 :             &'static PageServerConf,
     353           16 :             TenantShardId,
     354           16 :             TimelineId,
     355           16 :             RequestContext,
     356           16 :         ),
     357           16 :         io::Error,
     358           16 :     > {
     359           16 :         let repo_dir = PageServerConf::test_repo_dir(test_name);
     360           16 :         let _ = fs::remove_dir_all(&repo_dir);
     361           16 :         let conf = PageServerConf::dummy_conf(repo_dir);
     362           16 :         // Make a static copy of the config. This can never be free'd, but that's
     363           16 :         // OK in a test.
     364           16 :         let conf: &'static PageServerConf = Box::leak(Box::new(conf));
     365           16 : 
     366           16 :         let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
     367           16 :         let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
     368           16 :         fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
     369              : 
     370           16 :         let ctx =
     371           16 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
     372           16 : 
     373           16 :         Ok((conf, tenant_shard_id, timeline_id, ctx))
     374           16 :     }
     375              : 
     376              :     #[tokio::test]
     377            4 :     async fn ephemeral_file_holds_gate_open() {
     378            4 :         const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
     379            4 : 
     380            4 :         let (conf, tenant_id, timeline_id, ctx) =
     381            4 :             harness("ephemeral_file_holds_gate_open").unwrap();
     382            4 : 
     383            4 :         let gate = utils::sync::gate::Gate::default();
     384            4 :         let cancel = CancellationToken::new();
     385            4 : 
     386            4 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     387            4 :             .await
     388            4 :             .unwrap();
     389            4 : 
     390            4 :         let mut closing = tokio::task::spawn(async move {
     391            4 :             gate.close().await;
     392            4 :         });
     393            4 : 
     394            4 :         // gate is entered until the ephemeral file is dropped
     395            4 :         // do not start paused tokio-epoll-uring has a sleep loop
     396            4 :         tokio::time::pause();
     397            4 :         tokio::time::timeout(FOREVER, &mut closing)
     398            4 :             .await
     399            4 :             .expect_err("closing cannot complete before dropping");
     400            4 : 
     401            4 :         // this is a requirement of the reset_tenant functionality: we have to be able to restart a
     402            4 :         // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
     403            4 :         drop(file);
     404            4 : 
     405            4 :         tokio::time::timeout(FOREVER, &mut closing)
     406            4 :             .await
     407            4 :             .expect("closing completes right away")
     408            4 :             .expect("closing does not panic");
     409            4 :     }
     410              : 
     411              :     #[tokio::test]
     412            4 :     async fn test_ephemeral_file_basics() {
     413            4 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
     414            4 : 
     415            4 :         let gate = utils::sync::gate::Gate::default();
     416            4 :         let cancel = CancellationToken::new();
     417            4 : 
     418            4 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     419            4 :             .await
     420            4 :             .unwrap();
     421            4 : 
     422            4 :         let mutable = file.buffered_writer.mutable();
     423            4 :         let cap = mutable.capacity();
     424            4 :         let align = mutable.align();
     425            4 : 
     426            4 :         let write_nbytes = cap * 2 + cap / 2;
     427            4 : 
     428            4 :         let content: Vec<u8> = rand::thread_rng()
     429            4 :             .sample_iter(rand::distributions::Standard)
     430            4 :             .take(write_nbytes)
     431            4 :             .collect();
     432            4 : 
     433            4 :         let mut value_offsets = Vec::new();
     434         1280 :         for range in (0..write_nbytes)
     435            4 :             .step_by(align)
     436         1280 :             .map(|start| start..(start + align).min(write_nbytes))
     437            4 :         {
     438         1280 :             let off = file.write_raw(&content[range], &ctx).await.unwrap();
     439         1280 :             value_offsets.push(off);
     440            4 :         }
     441            4 : 
     442            4 :         assert_eq!(file.len() as usize, write_nbytes);
     443         1280 :         for (i, range) in (0..write_nbytes)
     444            4 :             .step_by(align)
     445         1280 :             .map(|start| start..(start + align).min(write_nbytes))
     446            4 :             .enumerate()
     447            4 :         {
     448         1280 :             assert_eq!(value_offsets[i], range.start.into_u64());
     449         1280 :             let buf = IoBufferMut::with_capacity(range.len());
     450         1280 :             let (buf_slice, nread) = file
     451         1280 :                 .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
     452         1280 :                 .await
     453         1280 :                 .unwrap();
     454         1280 :             let buf = buf_slice.into_inner();
     455         1280 :             assert_eq!(nread, range.len());
     456         1280 :             assert_eq!(&buf, &content[range]);
     457            4 :         }
     458            4 : 
     459            4 :         let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
     460            4 :         assert!(file_contents == content[0..cap * 2]);
     461            4 : 
     462            4 :         let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
     463            4 :         assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
     464            4 : 
     465            4 :         let mutable_buffer_contents = file.buffered_writer.mutable();
     466            4 :         assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
     467            4 :     }
     468              : 
     469              :     #[tokio::test]
     470            4 :     async fn test_flushes_do_happen() {
     471            4 :         let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
     472            4 : 
     473            4 :         let gate = utils::sync::gate::Gate::default();
     474            4 :         let cancel = CancellationToken::new();
     475            4 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     476            4 :             .await
     477            4 :             .unwrap();
     478            4 : 
     479            4 :         // mutable buffer and maybe_flushed buffer each has `cap` bytes.
     480            4 :         let cap = file.buffered_writer.mutable().capacity();
     481            4 : 
     482            4 :         let content: Vec<u8> = rand::thread_rng()
     483            4 :             .sample_iter(rand::distributions::Standard)
     484            4 :             .take(cap * 2 + cap / 2)
     485            4 :             .collect();
     486            4 : 
     487            4 :         file.write_raw(&content, &ctx).await.unwrap();
     488            4 : 
     489            4 :         // assert the state is as this test expects it to be
     490            4 :         let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
     491            4 :         assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
     492            4 :         let md = file.buffered_writer.as_inner().path().metadata().unwrap();
     493            4 :         assert_eq!(
     494            4 :             md.len(),
     495            4 :             2 * cap.into_u64(),
     496            4 :             "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
     497            4 :         );
     498            4 :         assert_eq!(
     499            4 :             &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
     500            4 :             &content[cap..cap * 2]
     501            4 :         );
     502            4 :         assert_eq!(
     503            4 :             &file.buffered_writer.mutable()[0..cap / 2],
     504            4 :             &content[cap * 2..cap * 2 + cap / 2]
     505            4 :         );
     506            4 :     }
     507              : 
     508              :     #[tokio::test]
     509            4 :     async fn test_read_split_across_file_and_buffer() {
     510            4 :         // This test exercises the logic on the read path that splits the logical read
     511            4 :         // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
     512            4 :         //
     513            4 :         // This test build on the assertions in test_flushes_do_happen
     514            4 : 
     515            4 :         let (conf, tenant_id, timeline_id, ctx) =
     516            4 :             harness("test_read_split_across_file_and_buffer").unwrap();
     517            4 : 
     518            4 :         let gate = utils::sync::gate::Gate::default();
     519            4 :         let cancel = CancellationToken::new();
     520            4 : 
     521            4 :         let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
     522            4 :             .await
     523            4 :             .unwrap();
     524            4 : 
     525            4 :         let mutable = file.buffered_writer.mutable();
     526            4 :         let cap = mutable.capacity();
     527            4 :         let align = mutable.align();
     528            4 :         let content: Vec<u8> = rand::thread_rng()
     529            4 :             .sample_iter(rand::distributions::Standard)
     530            4 :             .take(cap * 2 + cap / 2)
     531            4 :             .collect();
     532            4 : 
     533            4 :         let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
     534            4 : 
     535          108 :         let test_read = |start: usize, len: usize| {
     536          108 :             let file = &file;
     537          108 :             let ctx = &ctx;
     538          108 :             let content = &content;
     539          108 :             async move {
     540          108 :                 let (buf, nread) = file
     541          108 :                     .read_exact_at_eof_ok(
     542          108 :                         start.into_u64(),
     543          108 :                         IoBufferMut::with_capacity(len).slice_full(),
     544          108 :                         ctx,
     545          108 :                     )
     546          108 :                     .await
     547          108 :                     .unwrap();
     548          108 :                 assert_eq!(nread, len);
     549          108 :                 assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
     550          108 :             }
     551          108 :         };
     552            4 : 
     553           12 :         let test_read_all_offset_combinations = || {
     554           12 :             async move {
     555           12 :                 test_read(align, align).await;
     556            4 :                 // border onto edge of file
     557           12 :                 test_read(cap - align, align).await;
     558            4 :                 // read across file and buffer
     559           12 :                 test_read(cap - align, 2 * align).await;
     560            4 :                 // stay from start of maybe flushed buffer
     561           12 :                 test_read(cap, align).await;
     562            4 :                 // completely within maybe flushed buffer
     563           12 :                 test_read(cap + align, align).await;
     564            4 :                 // border onto edge of maybe flushed buffer.
     565           12 :                 test_read(cap * 2 - align, align).await;
     566            4 :                 // read across maybe flushed and mutable buffer
     567           12 :                 test_read(cap * 2 - align, 2 * align).await;
     568            4 :                 // read across three segments
     569           12 :                 test_read(cap - align, cap + 2 * align).await;
     570            4 :                 // completely within mutable buffer
     571           12 :                 test_read(cap * 2 + align, align).await;
     572           12 :             }
     573           12 :         };
     574            4 : 
     575            4 :         // completely within the file range
     576            4 :         assert!(align < cap, "test assumption");
     577            4 :         assert!(cap % align == 0);
     578            4 : 
     579            4 :         // test reads at different flush stages.
     580            4 :         let not_started = control.unwrap().into_not_started();
     581            4 :         test_read_all_offset_combinations().await;
     582            4 :         let in_progress = not_started.ready_to_flush();
     583            4 :         test_read_all_offset_combinations().await;
     584            4 :         in_progress.wait_until_flush_is_done().await;
     585            4 :         test_read_all_offset_combinations().await;
     586            4 :     }
     587              : }
        

Generated by: LCOV version 2.1-beta