LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 89.5 % 936 838
Test Date: 2025-04-24 20:31:15 Functions: 80.3 % 198 159

            Line data    Source code
       1              : //! VirtualFile is like a normal File, but it's not bound directly to
       2              : //! a file descriptor.
       3              : //!
       4              : //! Instead, the file is opened when it's read from,
       5              : //! and if too many files are open globally in the system, least-recently
       6              : //! used ones are closed.
       7              : //!
       8              : //! To track which files have been recently used, we use the clock algorithm
       9              : //! with a 'recently_used' flag on each slot.
      10              : //!
      11              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      12              : //! src/backend/storage/file/fd.c
      13              : //!
      14              : use std::fs::File;
      15              : use std::io::{Error, ErrorKind};
      16              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      17              : #[cfg(target_os = "linux")]
      18              : use std::os::unix::fs::OpenOptionsExt;
      19              : use std::sync::LazyLock;
      20              : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
      21              : 
      22              : use camino::{Utf8Path, Utf8PathBuf};
      23              : use once_cell::sync::OnceCell;
      24              : use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
      25              : use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
      26              : use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
      27              : use owned_buffers_io::io_buf_ext::FullSlice;
      28              : use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
      29              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      30              : use tokio::time::Instant;
      31              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      32              : 
      33              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      34              : use crate::assert_u64_eq_usize::UsizeIsU64;
      35              : use crate::context::RequestContext;
      36              : use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
      37              : use crate::page_cache::{PAGE_SZ, PageWriteGuard};
      38              : 
      39              : pub(crate) use api::IoMode;
      40              : pub(crate) use io_engine::IoEngineKind;
      41              : pub use io_engine::{
      42              :     FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
      43              :     io_engine_for_bench,
      44              : };
      45              : pub(crate) use metadata::Metadata;
      46              : pub(crate) use open_options::*;
      47              : pub use pageserver_api::models::virtual_file as api;
      48              : pub use temporary::TempVirtualFile;
      49              : 
      50              : pub(crate) mod io_engine;
      51              : mod metadata;
      52              : mod open_options;
      53              : mod temporary;
      54              : pub(crate) mod owned_buffers_io {
      55              :     //! Abstractions for IO with owned buffers.
      56              :     //!
      57              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      58              :     //! reason we need this abstraction.
      59              :     //!
      60              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      61              :     //! but for the time being we're proving out the primitives in the neon.git repo
      62              :     //! for faster iteration.
      63              : 
      64              :     pub(crate) mod aligned_buffer;
      65              :     pub(crate) mod io_buf_aligned;
      66              :     pub(crate) mod io_buf_ext;
      67              :     pub(crate) mod slice;
      68              :     pub(crate) mod write;
      69              : }
      70              : 
      71              : #[derive(Debug)]
      72              : pub struct VirtualFile {
      73              :     inner: VirtualFileInner,
      74              :     _mode: IoMode,
      75              : }
      76              : 
      77              : impl VirtualFile {
      78              :     /// Open a file in read-only mode. Like File::open.
      79         6096 :     pub async fn open<P: AsRef<Utf8Path>>(
      80         6096 :         path: P,
      81         6096 :         ctx: &RequestContext,
      82         6096 :     ) -> Result<Self, std::io::Error> {
      83         6096 :         let inner = VirtualFileInner::open(path, ctx).await?;
      84         6096 :         Ok(VirtualFile {
      85         6096 :             inner,
      86         6096 :             _mode: IoMode::Buffered,
      87         6096 :         })
      88         6096 :     }
      89              : 
      90              :     /// Open a file in read-only mode. Like File::open.
      91              :     ///
      92              :     /// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
      93         7656 :     pub async fn open_v2<P: AsRef<Utf8Path>>(
      94         7656 :         path: P,
      95         7656 :         ctx: &RequestContext,
      96         7656 :     ) -> Result<Self, std::io::Error> {
      97         7656 :         Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
      98         7656 :     }
      99              : 
     100        29676 :     pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
     101        29676 :         path: P,
     102        29676 :         open_options: &OpenOptions,
     103        29676 :         ctx: &RequestContext,
     104        29676 :     ) -> Result<Self, std::io::Error> {
     105        29676 :         let mode = get_io_mode();
     106        29676 :         let set_o_direct = match (mode, open_options.is_write()) {
     107        29676 :             (IoMode::Buffered, _) => false,
     108              :             #[cfg(target_os = "linux")]
     109            0 :             (IoMode::Direct, false) => true,
     110              :             #[cfg(target_os = "linux")]
     111            0 :             (IoMode::Direct, true) => false,
     112              :             #[cfg(target_os = "linux")]
     113            0 :             (IoMode::DirectRw, _) => true,
     114              :         };
     115        29676 :         let open_options = open_options.clone();
     116        29676 :         let open_options = if set_o_direct {
     117              :             #[cfg(target_os = "linux")]
     118              :             {
     119            0 :                 let mut open_options = open_options;
     120            0 :                 open_options.custom_flags(nix::libc::O_DIRECT);
     121            0 :                 open_options
     122              :             }
     123              :             #[cfg(not(target_os = "linux"))]
     124              :             unreachable!(
     125              :                 "O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
     126              :             );
     127              :         } else {
     128        29676 :             open_options
     129              :         };
     130        29676 :         let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
     131        29676 :         Ok(VirtualFile { inner, _mode: mode })
     132        29676 :     }
     133              : 
     134         8988 :     pub fn path(&self) -> &Utf8Path {
     135         8988 :         self.inner.path.as_path()
     136         8988 :     }
     137              : 
     138          132 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     139          132 :         final_path: Utf8PathBuf,
     140          132 :         tmp_path: Utf8PathBuf,
     141          132 :         content: B,
     142          132 :     ) -> std::io::Result<()> {
     143          132 :         VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
     144          132 :     }
     145              : 
     146        17124 :     pub async fn sync_all(&self) -> Result<(), Error> {
     147        17124 :         if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
     148            0 :             return Ok(());
     149        17124 :         }
     150        17124 :         self.inner.sync_all().await
     151        17124 :     }
     152              : 
     153            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     154            0 :         if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
     155            0 :             return Ok(());
     156            0 :         }
     157            0 :         self.inner.sync_data().await
     158            0 :     }
     159              : 
     160           84 :     pub async fn set_len(&self, len: u64, ctx: &RequestContext) -> Result<(), Error> {
     161           84 :         self.inner.set_len(len, ctx).await
     162           84 :     }
     163              : 
     164        10992 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     165        10992 :         self.inner.metadata().await
     166        10992 :     }
     167              : 
     168      1841698 :     pub async fn read_exact_at<Buf>(
     169      1841698 :         &self,
     170      1841698 :         slice: Slice<Buf>,
     171      1841698 :         offset: u64,
     172      1841698 :         ctx: &RequestContext,
     173      1841698 :     ) -> Result<Slice<Buf>, Error>
     174      1841698 :     where
     175      1841698 :         Buf: IoBufAlignedMut + Send,
     176      1841698 :     {
     177      1841698 :         self.inner.read_exact_at(slice, offset, ctx).await
     178      1841698 :     }
     179              : 
     180       185181 :     pub async fn read_exact_at_page(
     181       185181 :         &self,
     182       185181 :         page: PageWriteGuard<'static>,
     183       185181 :         offset: u64,
     184       185181 :         ctx: &RequestContext,
     185       185181 :     ) -> Result<PageWriteGuard<'static>, Error> {
     186       185181 :         self.inner.read_exact_at_page(page, offset, ctx).await
     187       185181 :     }
     188              : 
     189       237203 :     pub async fn write_all_at<Buf: IoBufAligned + Send>(
     190       237203 :         &self,
     191       237203 :         buf: FullSlice<Buf>,
     192       237203 :         offset: u64,
     193       237203 :         ctx: &RequestContext,
     194       237203 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     195       237203 :         self.inner.write_all_at(buf, offset, ctx).await
     196       237203 :     }
     197              : 
     198            0 :     pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
     199            0 :         path: P,
     200            0 :         ctx: &RequestContext,
     201            0 :     ) -> std::io::Result<String> {
     202            0 :         let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
     203            0 :         let mut buf = Vec::new();
     204            0 :         let mut tmp = vec![0; 128];
     205            0 :         let mut pos: u64 = 0;
     206              :         loop {
     207            0 :             let slice = tmp.slice(..128);
     208            0 :             let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
     209            0 :             match res {
     210            0 :                 Ok(0) => break,
     211            0 :                 Ok(n) => {
     212            0 :                     pos += n as u64;
     213            0 :                     buf.extend_from_slice(&slice[..n]);
     214            0 :                 }
     215            0 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     216            0 :                 Err(e) => return Err(e),
     217              :             }
     218            0 :             tmp = slice.into_inner();
     219              :         }
     220            0 :         String::from_utf8(buf).map_err(|_| {
     221            0 :             std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
     222            0 :         })
     223            0 :     }
     224              : }
     225              : 
     226              : /// Indicates whether to enable fsync, fdatasync, or O_SYNC/O_DSYNC when writing
     227              : /// files. Switching this off is unsafe and only used for testing on machines
     228              : /// with slow drives.
     229              : #[repr(u8)]
     230              : pub enum SyncMode {
     231              :     Sync,
     232              :     UnsafeNoSync,
     233              : }
     234              : 
     235              : impl TryFrom<u8> for SyncMode {
     236              :     type Error = u8;
     237              : 
     238            0 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
     239            0 :         Ok(match value {
     240            0 :             v if v == (SyncMode::Sync as u8) => SyncMode::Sync,
     241            0 :             v if v == (SyncMode::UnsafeNoSync as u8) => SyncMode::UnsafeNoSync,
     242            0 :             x => return Err(x),
     243              :         })
     244            0 :     }
     245              : }
     246              : 
     247              : ///
     248              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
     249              : /// the underlying file is closed if the system is low on file descriptors,
     250              : /// and re-opened when it's accessed again.
     251              : ///
     252              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
     253              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
     254              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
     255              : /// require a mutable reference, because they modify the "current position".
     256              : ///
     257              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
     258              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
     259              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
     260              : /// 'tag' field is used to detect whether the handle still is valid or not.
     261              : ///
     262              : #[derive(Debug)]
     263              : pub struct VirtualFileInner {
     264              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
     265              :     /// might contain our File, or it may be empty, or it may contain a File that
     266              :     /// belongs to a different VirtualFile.
     267              :     handle: RwLock<SlotHandle>,
     268              : 
     269              :     /// File path and options to use to open it.
     270              :     ///
     271              :     /// Note: this only contains the options needed to re-open it. For example,
     272              :     /// if a new file is created, we only pass the create flag when it's initially
     273              :     /// opened, in the VirtualFile::create() function, and strip the flag before
     274              :     /// storing it here.
     275              :     pub path: Utf8PathBuf,
     276              :     open_options: OpenOptions,
     277              : }
     278              : 
     279              : #[derive(Debug, PartialEq, Clone, Copy)]
     280              : struct SlotHandle {
     281              :     /// Index into OPEN_FILES.slots
     282              :     index: usize,
     283              : 
     284              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     285              :     /// been recycled and no longer contains the FD for this virtual file.
     286              :     tag: u64,
     287              : }
     288              : 
     289              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     290              : /// are currently open. Each slot in the array is protected by a separate lock,
     291              : /// so that different files can be accessed independently. The lock must be held
     292              : /// in write mode to replace the slot with a different file, but a read mode
     293              : /// is enough to operate on the file, whether you're reading or writing to it.
     294              : ///
     295              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     296              : /// the virtual_file::init() function. It must be called exactly once at page
     297              : /// server startup.
     298              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     299              : 
     300              : struct OpenFiles {
     301              :     slots: &'static [Slot],
     302              : 
     303              :     /// clock arm for the clock algorithm
     304              :     next: AtomicUsize,
     305              : }
     306              : 
     307              : struct Slot {
     308              :     inner: RwLock<SlotInner>,
     309              : 
     310              :     /// has this file been used since last clock sweep?
     311              :     recently_used: AtomicBool,
     312              : }
     313              : 
     314              : struct SlotInner {
     315              :     /// Counter that's incremented every time a different file is stored here.
     316              :     /// To avoid the ABA problem.
     317              :     tag: u64,
     318              : 
     319              :     /// the underlying file
     320              :     file: Option<OwnedFd>,
     321              : }
     322              : 
     323              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     324              : struct PageWriteGuardBuf {
     325              :     page: PageWriteGuard<'static>,
     326              : }
     327              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     328              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     329              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     330              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     331              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     332       740523 :     fn stable_ptr(&self) -> *const u8 {
     333       740523 :         self.page.as_ptr()
     334       740523 :     }
     335      1388556 :     fn bytes_init(&self) -> usize {
     336      1388556 :         self.page.len()
     337      1388556 :     }
     338       555543 :     fn bytes_total(&self) -> usize {
     339       555543 :         self.page.len()
     340       555543 :     }
     341              : }
     342              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     343              : // hence it's safe to hand out the `stable_mut_ptr()`.
     344              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     345       277671 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     346       277671 :         self.page.as_mut_ptr()
     347       277671 :     }
     348              : 
     349       185181 :     unsafe fn set_init(&mut self, pos: usize) {
     350       185181 :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     351       185181 :         assert!(pos <= self.page.len());
     352       185181 :     }
     353              : }
     354              : 
     355              : impl OpenFiles {
     356              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     357              :     ///
     358              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     359              :     /// recently_used has been set. It's all ready for reuse.
     360      1160219 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     361      1160219 :         //
     362      1160219 :         // Run the clock algorithm to find a slot to replace.
     363      1160219 :         //
     364      1160219 :         let num_slots = self.slots.len();
     365      1160219 :         let mut retries = 0;
     366              :         let mut slot;
     367              :         let mut slot_guard;
     368              :         let index;
     369              :         loop {
     370     14129610 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     371     14129610 :             slot = &self.slots[next];
     372     14129610 : 
     373     14129610 :             // If the recently_used flag on this slot is set, continue the clock
     374     14129610 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     375     14129610 :             // lock, also continue the clock sweep.
     376     14129610 :             //
     377     14129610 :             // We only continue in this manner for a while, though. If we loop
     378     14129610 :             // through the array twice without finding a victim, just pick the
     379     14129610 :             // next slot and wait until we can reuse it. This way, we avoid
     380     14129610 :             // spinning in the extreme case that all the slots are busy with an
     381     14129610 :             // I/O operation.
     382     14129610 :             if retries < num_slots * 2 {
     383     13587570 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     384     12344676 :                     if let Ok(guard) = slot.inner.try_write() {
     385       618179 :                         slot_guard = guard;
     386       618179 :                         index = next;
     387       618179 :                         break;
     388     11726497 :                     }
     389      1242894 :                 }
     390     12969391 :                 retries += 1;
     391              :             } else {
     392       542040 :                 slot_guard = slot.inner.write().await;
     393       542040 :                 index = next;
     394       542040 :                 break;
     395              :             }
     396              :         }
     397              : 
     398              :         //
     399              :         // We now have the victim slot locked. If it was in use previously, close the
     400              :         // old file.
     401              :         //
     402      1160219 :         if let Some(old_file) = slot_guard.file.take() {
     403      1129920 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     404      1129920 :             // distinguish the two.
     405      1129920 :             STORAGE_IO_TIME_METRIC
     406      1129920 :                 .get(StorageIoOperation::CloseByReplace)
     407      1129920 :                 .observe_closure_duration(|| drop(old_file));
     408      1129920 :         }
     409              : 
     410              :         // Prepare the slot for reuse and return it
     411      1160219 :         slot_guard.tag += 1;
     412      1160219 :         slot.recently_used.store(true, Ordering::Relaxed);
     413      1160219 :         (
     414      1160219 :             SlotHandle {
     415      1160219 :                 index,
     416      1160219 :                 tag: slot_guard.tag,
     417      1160219 :             },
     418      1160219 :             slot_guard,
     419      1160219 :         )
     420      1160219 :     }
     421              : }
     422              : 
     423              : /// Identify error types that should alwways terminate the process.  Other
     424              : /// error types may be elegible for retry.
     425           36 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     426              :     use nix::errno::Errno::*;
     427           36 :     match e.raw_os_error().map(nix::errno::from_i32) {
     428              :         Some(EIO) => {
     429              :             // Terminate on EIO because we no longer trust the device to store
     430              :             // data safely, or to uphold persistence guarantees on fsync.
     431            0 :             true
     432              :         }
     433              :         Some(EROFS) => {
     434              :             // Terminate on EROFS because a filesystem is usually remounted
     435              :             // readonly when it has experienced some critical issue, so the same
     436              :             // logic as EIO applies.
     437            0 :             true
     438              :         }
     439              :         Some(EACCES) => {
     440              :             // Terminate on EACCESS because we should always have permissions
     441              :             // for our own data dir: if we don't, then we can't do our job and
     442              :             // need administrative intervention to fix permissions.  Terminating
     443              :             // is the best way to make sure we stop cleanly rather than going
     444              :             // into infinite retry loops, and will make it clear to the outside
     445              :             // world that we need help.
     446            0 :             true
     447              :         }
     448              :         _ => {
     449              :             // Treat all other local file I/O errors are retryable.  This includes:
     450              :             // - ENOSPC: we stay up and wait for eviction to free some space
     451              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     452              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     453           36 :             false
     454              :         }
     455              :     }
     456           36 : }
     457              : 
     458              : /// Call this when the local filesystem gives us an error with an external
     459              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     460              : /// bad storage or bad configuration, and we can't fix that from inside
     461              : /// a running process.
     462            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     463            0 :     let backtrace = std::backtrace::Backtrace::force_capture();
     464            0 :     tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
     465            0 :     std::process::abort();
     466              : }
     467              : 
     468              : pub(crate) trait MaybeFatalIo<T> {
     469              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     470              :     fn fatal_err(self, context: &str) -> T;
     471              : }
     472              : 
     473              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     474              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     475              :     ///
     476              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     477              :     /// not on ENOSPC.
     478      6993169 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     479      6993169 :         if let Err(e) = &self {
     480           36 :             if is_fatal_io_error(e) {
     481            0 :                 on_fatal_io_error(e, context);
     482           36 :             }
     483      6993133 :         }
     484      6993169 :         self
     485      6993169 :     }
     486              : 
     487              :     /// Terminate the process on any I/O error.
     488              :     ///
     489              :     /// This is appropriate for reads on files that we know exist: they should always work.
     490        12348 :     fn fatal_err(self, context: &str) -> T {
     491        12348 :         match self {
     492        12348 :             Ok(v) => v,
     493            0 :             Err(e) => {
     494            0 :                 on_fatal_io_error(&e, context);
     495              :             }
     496              :         }
     497        12348 :     }
     498              : }
     499              : 
     500              : /// Observe duration for the given storage I/O operation
     501              : ///
     502              : /// Unlike `observe_closure_duration`, this supports async,
     503              : /// where "support" means that we measure wall clock time.
     504              : macro_rules! observe_duration {
     505              :     ($op:expr, $($body:tt)*) => {{
     506              :         let instant = Instant::now();
     507              :         let result = $($body)*;
     508              :         let elapsed = instant.elapsed().as_secs_f64();
     509              :         STORAGE_IO_TIME_METRIC
     510              :             .get($op)
     511              :             .observe(elapsed);
     512              :         result
     513              :     }}
     514              : }
     515              : 
     516              : macro_rules! with_file {
     517              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     518              :         let $ident = $this.lock_file().await?;
     519              :         observe_duration!($op, $($body)*)
     520              :     }};
     521              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     522              :         let mut $ident = $this.lock_file().await?;
     523              :         observe_duration!($op, $($body)*)
     524              :     }};
     525              : }
     526              : 
     527              : impl VirtualFileInner {
     528              :     /// Open a file in read-only mode. Like File::open.
     529         6096 :     pub async fn open<P: AsRef<Utf8Path>>(
     530         6096 :         path: P,
     531         6096 :         ctx: &RequestContext,
     532         6096 :     ) -> Result<VirtualFileInner, std::io::Error> {
     533         6096 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
     534         6096 :     }
     535              : 
     536              :     /// Open a file with given options.
     537              :     ///
     538              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     539              :     /// they will be applied also when the file is subsequently re-opened, not only
     540              :     /// on the first time. Make sure that's sane!
     541        36972 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     542        36972 :         path: P,
     543        36972 :         open_options: OpenOptions,
     544        36972 :         _ctx: &RequestContext,
     545        36972 :     ) -> Result<VirtualFileInner, std::io::Error> {
     546        36972 :         let path = path.as_ref();
     547        36972 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     548              : 
     549              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     550              :         // where our caller doesn't get to use the returned VirtualFile before its
     551              :         // slot gets re-used by someone else.
     552        36972 :         let file = observe_duration!(StorageIoOperation::Open, {
     553        36972 :             open_options.open(path.as_std_path()).await?
     554              :         });
     555              : 
     556              :         // Strip all options other than read and write.
     557              :         //
     558              :         // It would perhaps be nicer to check just for the read and write flags
     559              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     560              :         // only to set them.
     561        36972 :         let mut reopen_options = open_options.clone();
     562        36972 :         reopen_options.create(false);
     563        36972 :         reopen_options.create_new(false);
     564        36972 :         reopen_options.truncate(false);
     565        36972 : 
     566        36972 :         let vfile = VirtualFileInner {
     567        36972 :             handle: RwLock::new(handle),
     568        36972 :             path: path.to_owned(),
     569        36972 :             open_options: reopen_options,
     570        36972 :         };
     571        36972 : 
     572        36972 :         // TODO: Under pressure, it's likely the slot will get re-used and
     573        36972 :         // the underlying file closed before they get around to using it.
     574        36972 :         // => https://github.com/neondatabase/neon/issues/6065
     575        36972 :         slot_guard.file.replace(file);
     576        36972 : 
     577        36972 :         Ok(vfile)
     578        36972 :     }
     579              : 
     580              :     /// Async version of [`::utils::crashsafe::overwrite`].
     581              :     ///
     582              :     /// # NB:
     583              :     ///
     584              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     585              :     /// it did at an earlier time.
     586              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     587          168 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     588          168 :         final_path: Utf8PathBuf,
     589          168 :         tmp_path: Utf8PathBuf,
     590          168 :         content: B,
     591          168 :     ) -> std::io::Result<()> {
     592          168 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     593          168 :         // See https://github.com/neondatabase/neon/issues/6663
     594          168 : 
     595          168 :         tokio::task::spawn_blocking(move || {
     596          168 :             let slice_storage;
     597          168 :             let content_len = content.bytes_init();
     598          168 :             let content = if content.bytes_init() > 0 {
     599          168 :                 slice_storage = Some(content.slice(0..content_len));
     600          168 :                 slice_storage.as_deref().expect("just set it to Some()")
     601              :             } else {
     602            0 :                 &[]
     603              :             };
     604          168 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     605          168 :                 .maybe_fatal_err("crashsafe_overwrite")
     606          168 :         })
     607          168 :         .await
     608          168 :         .expect("blocking task is never aborted")
     609          168 :     }
     610              : 
     611              :     /// Call File::sync_all() on the underlying File.
     612        17124 :     pub async fn sync_all(&self) -> Result<(), Error> {
     613        17124 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     614        17124 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     615        17124 :             res.maybe_fatal_err("sync_all")
     616              :         })
     617        17124 :     }
     618              : 
     619              :     /// Call File::sync_data() on the underlying File.
     620            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     621            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     622            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     623            0 :             res.maybe_fatal_err("sync_data")
     624              :         })
     625            0 :     }
     626              : 
     627        10992 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     628        10992 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     629        10992 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     630        10992 :             res
     631              :         })
     632        10992 :     }
     633              : 
     634           84 :     pub async fn set_len(&self, len: u64, _ctx: &RequestContext) -> Result<(), Error> {
     635           84 :         with_file!(self, StorageIoOperation::SetLen, |file_guard| {
     636           84 :             let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
     637           84 :             res.maybe_fatal_err("set_len")
     638              :         })
     639           84 :     }
     640              : 
     641              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     642              :     /// opens it and evicts some other File if necessary. The passed parameter is
     643              :     /// assumed to be a function available for the physical `File`.
     644              :     ///
     645              :     /// We are doing it via a macro as Rust doesn't support async closures that
     646              :     /// take on parameters with lifetimes.
     647      3553830 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     648      3553830 :         let open_files = get_open_files();
     649              : 
     650      1123247 :         let mut handle_guard = {
     651              :             // Read the cached slot handle, and see if the slot that it points to still
     652              :             // contains our File.
     653              :             //
     654              :             // We only need to hold the handle lock while we read the current handle. If
     655              :             // another thread closes the file and recycles the slot for a different file,
     656              :             // we will notice that the handle we read is no longer valid and retry.
     657      3553830 :             let mut handle = *self.handle.read().await;
     658              :             loop {
     659              :                 // Check if the slot contains our File
     660              :                 {
     661      4115354 :                     let slot = &open_files.slots[handle.index];
     662      4115354 :                     let slot_guard = slot.inner.read().await;
     663      4115354 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     664              :                         // Found a cached file descriptor.
     665      2430583 :                         slot.recently_used.store(true, Ordering::Relaxed);
     666      2430583 :                         return Ok(FileGuard { slot_guard });
     667      1684771 :                     }
     668              :                 }
     669              : 
     670              :                 // The slot didn't contain our File. We will have to open it ourselves,
     671              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     672              :                 // that no other thread will try to concurrently open the same file.
     673      1684771 :                 let handle_guard = self.handle.write().await;
     674              : 
     675              :                 // If another thread changed the handle while we were not holding the lock,
     676              :                 // then the handle might now be valid again. Loop back to retry.
     677      1684771 :                 if *handle_guard != handle {
     678       561524 :                     handle = *handle_guard;
     679       561524 :                     continue;
     680      1123247 :                 }
     681      1123247 :                 break handle_guard;
     682              :             }
     683              :         };
     684              : 
     685              :         // We need to open the file ourselves. The handle in the VirtualFile is
     686              :         // now locked in write-mode. Find a free slot to put it in.
     687      1123247 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     688              : 
     689              :         // Re-open the physical file.
     690              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     691              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     692              :         // of the virtual file descriptor cache.
     693      1123247 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     694      1123247 :             self.open_options.open(self.path.as_std_path()).await?
     695              :         });
     696              : 
     697              :         // Store the File in the slot and update the handle in the VirtualFile
     698              :         // to point to it.
     699      1123247 :         slot_guard.file.replace(file);
     700      1123247 : 
     701      1123247 :         *handle_guard = handle;
     702      1123247 : 
     703      1123247 :         Ok(FileGuard {
     704      1123247 :             slot_guard: slot_guard.downgrade(),
     705      1123247 :         })
     706      3553830 :     }
     707              : 
     708              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     709              :     ///
     710              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     711      3288427 :     pub async fn read_exact_at<Buf>(
     712      3288427 :         &self,
     713      3288427 :         slice: Slice<Buf>,
     714      3288427 :         offset: u64,
     715      3288427 :         ctx: &RequestContext,
     716      3288427 :     ) -> Result<Slice<Buf>, Error>
     717      3288427 :     where
     718      3288427 :         Buf: IoBufAlignedMut + Send,
     719      3288427 :     {
     720      3288427 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     721      3288427 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     722              :         } else {
     723            0 :             None
     724              :         };
     725              : 
     726      3288427 :         let original_bounds = slice.bounds();
     727      3288427 :         let (buf, res) =
     728      3288427 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     729      3288427 :         let res = res.map(|_| buf.slice(original_bounds));
     730              : 
     731      3288427 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     732      3288427 :             if let Ok(slice) = &res {
     733      3288415 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     734      3288415 :                 assert_eq!(original_bounds, returned_bounds);
     735           12 :             }
     736            0 :         }
     737              : 
     738      3288427 :         res
     739      3288427 :     }
     740              : 
     741              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     742       185181 :     pub async fn read_exact_at_page(
     743       185181 :         &self,
     744       185181 :         page: PageWriteGuard<'static>,
     745       185181 :         offset: u64,
     746       185181 :         ctx: &RequestContext,
     747       185181 :     ) -> Result<PageWriteGuard<'static>, Error> {
     748       185181 :         let buf = PageWriteGuardBuf { page }.slice_full();
     749       185181 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     750       185181 :         self.read_exact_at(buf, offset, ctx)
     751       185181 :             .await
     752       185181 :             .map(|slice| slice.into_inner().page)
     753       185181 :     }
     754              : 
     755              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     756       237203 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     757       237203 :         &self,
     758       237203 :         buf: FullSlice<Buf>,
     759       237203 :         mut offset: u64,
     760       237203 :         ctx: &RequestContext,
     761       237203 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     762       237203 :         let buf = buf.into_raw_slice();
     763       237203 :         let bounds = buf.bounds();
     764       237203 :         let restore =
     765       237203 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     766       237203 :         let mut buf = buf;
     767       474394 :         while !buf.is_empty() {
     768       237203 :             let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
     769       237203 :             buf = tmp.into_raw_slice();
     770           12 :             match res {
     771              :                 Ok(0) => {
     772            0 :                     return (
     773            0 :                         restore(buf),
     774            0 :                         Err(Error::new(
     775            0 :                             std::io::ErrorKind::WriteZero,
     776            0 :                             "failed to write whole buffer",
     777            0 :                         )),
     778            0 :                     );
     779              :                 }
     780       237191 :                 Ok(n) => {
     781       237191 :                     buf = buf.slice(n..);
     782       237191 :                     offset += n as u64;
     783       237191 :                 }
     784           12 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     785           12 :                 Err(e) => return (restore(buf), Err(e)),
     786              :             }
     787              :         }
     788       237191 :         (restore(buf), Ok(()))
     789       237203 :     }
     790              : 
     791      3288427 :     pub(super) async fn read_at<Buf>(
     792      3288427 :         &self,
     793      3288427 :         buf: tokio_epoll_uring::Slice<Buf>,
     794      3288427 :         offset: u64,
     795      3288427 :         ctx: &RequestContext,
     796      3288427 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     797      3288427 :     where
     798      3288427 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     799      3288427 :     {
     800      3288427 :         let file_guard = match self
     801      3288427 :             .lock_file()
     802      3288427 :             .await
     803      3288427 :             .maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
     804              :         {
     805      3288427 :             Ok(file_guard) => file_guard,
     806            0 :             Err(e) => return (buf, Err(e)),
     807              :         };
     808              : 
     809      3288427 :         observe_duration!(StorageIoOperation::Read, {
     810      3288427 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     811      3288427 :             let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
     812      3288427 :             if let Ok(size) = res {
     813      3288415 :                 ctx.io_size_metrics().read.add(size.into_u64());
     814      3288415 :             }
     815      3288427 :             (buf, res)
     816              :         })
     817      3288427 :     }
     818              : 
     819       237203 :     async fn write_at<B: IoBuf + Send>(
     820       237203 :         &self,
     821       237203 :         buf: FullSlice<B>,
     822       237203 :         offset: u64,
     823       237203 :         ctx: &RequestContext,
     824       237203 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     825       237203 :         let file_guard = match self.lock_file().await {
     826       237203 :             Ok(file_guard) => file_guard,
     827            0 :             Err(e) => return (buf, Err(e)),
     828              :         };
     829       237203 :         observe_duration!(StorageIoOperation::Write, {
     830       237203 :             let ((_file_guard, buf), result) =
     831       237203 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     832       237203 :             let result = result.maybe_fatal_err("write_at");
     833       237203 :             if let Ok(size) = result {
     834       237191 :                 ctx.io_size_metrics().write.add(size.into_u64());
     835       237191 :             }
     836       237203 :             (buf, result)
     837              :         })
     838       237203 :     }
     839              : }
     840              : 
     841              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     842      3288475 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     843      3288475 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     844      3288475 :     mut offset: u64,
     845      3288475 :     mut read_at: F,
     846      3288475 : ) -> (Buf, std::io::Result<()>)
     847      3288475 : where
     848      3288475 :     Buf: IoBufMut + Send,
     849      3288475 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
     850      3288475 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
     851      3288475 : {
     852      6576950 :     while buf.bytes_total() != 0 {
     853              :         let res;
     854      3288499 :         (buf, res) = read_at(buf, offset).await;
     855           12 :         match res {
     856           12 :             Ok(0) => break,
     857      3288475 :             Ok(n) => {
     858      3288475 :                 buf = buf.slice(n..);
     859      3288475 :                 offset += n as u64;
     860      3288475 :             }
     861           12 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     862           12 :             Err(e) => return (buf.into_inner(), Err(e)),
     863              :         }
     864              :     }
     865              :     // NB: don't use `buf.is_empty()` here; it is from the
     866              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     867              :     // returned by it only covers the initialized portion of `buf`.
     868              :     // Whereas we're interested in ensuring that we filled the entire
     869              :     // buffer that the user passed in.
     870      3288463 :     if buf.bytes_total() != 0 {
     871           12 :         (
     872           12 :             buf.into_inner(),
     873           12 :             Err(std::io::Error::new(
     874           12 :                 std::io::ErrorKind::UnexpectedEof,
     875           12 :                 "failed to fill whole buffer",
     876           12 :             )),
     877           12 :         )
     878              :     } else {
     879      3288451 :         assert_eq!(buf.len(), buf.bytes_total());
     880      3288451 :         (buf.into_inner(), Ok(()))
     881              :     }
     882      3288475 : }
     883              : 
     884              : #[cfg(test)]
     885              : mod test_read_exact_at_impl {
     886              : 
     887              :     use std::collections::VecDeque;
     888              :     use std::sync::Arc;
     889              : 
     890              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     891              : 
     892              :     use super::read_exact_at_impl;
     893              : 
     894              :     struct Expectation {
     895              :         offset: u64,
     896              :         bytes_total: usize,
     897              :         result: std::io::Result<Vec<u8>>,
     898              :     }
     899              :     struct MockReadAt {
     900              :         expectations: VecDeque<Expectation>,
     901              :     }
     902              : 
     903              :     impl MockReadAt {
     904           72 :         async fn read_at(
     905           72 :             &mut self,
     906           72 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     907           72 :             offset: u64,
     908           72 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     909           72 :             let exp = self
     910           72 :                 .expectations
     911           72 :                 .pop_front()
     912           72 :                 .expect("read_at called but we have no expectations left");
     913           72 :             assert_eq!(exp.offset, offset);
     914           72 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     915           72 :             match exp.result {
     916           72 :                 Ok(bytes) => {
     917           72 :                     assert!(bytes.len() <= buf.bytes_total());
     918           72 :                     buf.put_slice(&bytes);
     919           72 :                     (buf, Ok(bytes.len()))
     920              :                 }
     921            0 :                 Err(e) => (buf, Err(e)),
     922              :             }
     923           72 :         }
     924              :     }
     925              : 
     926              :     impl Drop for MockReadAt {
     927           48 :         fn drop(&mut self) {
     928           48 :             assert_eq!(self.expectations.len(), 0);
     929           48 :         }
     930              :     }
     931              : 
     932              :     #[tokio::test]
     933           12 :     async fn test_basic() {
     934           12 :         let buf = Vec::with_capacity(5).slice_full();
     935           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     936           12 :             expectations: VecDeque::from(vec![Expectation {
     937           12 :                 offset: 0,
     938           12 :                 bytes_total: 5,
     939           12 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     940           12 :             }]),
     941           12 :         }));
     942           12 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     943           12 :             let mock_read_at = Arc::clone(&mock_read_at);
     944           12 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     945           12 :         })
     946           12 :         .await;
     947           12 :         assert!(res.is_ok());
     948           12 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
     949           12 :     }
     950              : 
     951              :     #[tokio::test]
     952           12 :     async fn test_empty_buf_issues_no_syscall() {
     953           12 :         let buf = Vec::new().slice_full();
     954           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     955           12 :             expectations: VecDeque::new(),
     956           12 :         }));
     957           12 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     958            0 :             let mock_read_at = Arc::clone(&mock_read_at);
     959           12 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     960           12 :         })
     961           12 :         .await;
     962           12 :         assert!(res.is_ok());
     963           12 :     }
     964              : 
     965              :     #[tokio::test]
     966           12 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
     967           12 :         let buf = Vec::with_capacity(4).slice_full();
     968           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     969           12 :             expectations: VecDeque::from(vec![
     970           12 :                 Expectation {
     971           12 :                     offset: 0,
     972           12 :                     bytes_total: 4,
     973           12 :                     result: Ok(vec![b'a', b'b']),
     974           12 :                 },
     975           12 :                 Expectation {
     976           12 :                     offset: 2,
     977           12 :                     bytes_total: 2,
     978           12 :                     result: Ok(vec![b'c', b'd']),
     979           12 :                 },
     980           12 :             ]),
     981           12 :         }));
     982           24 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     983           24 :             let mock_read_at = Arc::clone(&mock_read_at);
     984           24 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     985           24 :         })
     986           12 :         .await;
     987           12 :         assert!(res.is_ok());
     988           12 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
     989           12 :     }
     990              : 
     991              :     #[tokio::test]
     992           12 :     async fn test_eof_before_buffer_full() {
     993           12 :         let buf = Vec::with_capacity(3).slice_full();
     994           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     995           12 :             expectations: VecDeque::from(vec![
     996           12 :                 Expectation {
     997           12 :                     offset: 0,
     998           12 :                     bytes_total: 3,
     999           12 :                     result: Ok(vec![b'a']),
    1000           12 :                 },
    1001           12 :                 Expectation {
    1002           12 :                     offset: 1,
    1003           12 :                     bytes_total: 2,
    1004           12 :                     result: Ok(vec![b'b']),
    1005           12 :                 },
    1006           12 :                 Expectation {
    1007           12 :                     offset: 2,
    1008           12 :                     bytes_total: 1,
    1009           12 :                     result: Ok(vec![]),
    1010           12 :                 },
    1011           12 :             ]),
    1012           12 :         }));
    1013           36 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1014           36 :             let mock_read_at = Arc::clone(&mock_read_at);
    1015           36 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1016           36 :         })
    1017           12 :         .await;
    1018           12 :         let Err(err) = res else {
    1019           12 :             panic!("should return an error");
    1020           12 :         };
    1021           12 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
    1022           12 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
    1023           12 :         // buffer contents on error are unspecified
    1024           12 :     }
    1025              : }
    1026              : 
    1027              : struct FileGuard {
    1028              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
    1029              : }
    1030              : 
    1031              : impl AsRef<OwnedFd> for FileGuard {
    1032      3553830 :     fn as_ref(&self) -> &OwnedFd {
    1033      3553830 :         // This unwrap is safe because we only create `FileGuard`s
    1034      3553830 :         // if we know that the file is Some.
    1035      3553830 :         self.slot_guard.file.as_ref().unwrap()
    1036      3553830 :     }
    1037              : }
    1038              : 
    1039              : impl FileGuard {
    1040              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1041      1776832 :     fn with_std_file<F, R>(&self, with: F) -> R
    1042      1776832 :     where
    1043      1776832 :         F: FnOnce(&File) -> R,
    1044      1776832 :     {
    1045      1776832 :         // SAFETY:
    1046      1776832 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1047      1776832 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
    1048      1776832 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1049      1776832 :         let res = with(&file);
    1050      1776832 :         let _ = file.into_raw_fd();
    1051      1776832 :         res
    1052      1776832 :     }
    1053              : }
    1054              : 
    1055              : impl tokio_epoll_uring::IoFd for FileGuard {
    1056      1776998 :     unsafe fn as_fd(&self) -> RawFd {
    1057      1776998 :         let owned_fd: &OwnedFd = self.as_ref();
    1058      1776998 :         owned_fd.as_raw_fd()
    1059      1776998 :     }
    1060              : }
    1061              : 
    1062              : #[cfg(test)]
    1063              : impl VirtualFile {
    1064        62748 :     pub(crate) async fn read_blk(
    1065        62748 :         &self,
    1066        62748 :         blknum: u32,
    1067        62748 :         ctx: &RequestContext,
    1068        62748 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1069        62748 :         self.inner.read_blk(blknum, ctx).await
    1070        62748 :     }
    1071              : }
    1072              : 
    1073              : #[cfg(test)]
    1074              : impl VirtualFileInner {
    1075        62748 :     pub(crate) async fn read_blk(
    1076        62748 :         &self,
    1077        62748 :         blknum: u32,
    1078        62748 :         ctx: &RequestContext,
    1079        62748 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1080              :         use crate::page_cache::PAGE_SZ;
    1081        62748 :         let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
    1082        62748 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1083        62748 :         let slice = self
    1084        62748 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1085        62748 :             .await?;
    1086        62748 :         Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
    1087        62748 :             slice.into_inner(),
    1088        62748 :         ))
    1089        62748 :     }
    1090              : }
    1091              : 
    1092              : impl Drop for VirtualFileInner {
    1093              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1094        31952 :     fn drop(&mut self) {
    1095        31952 :         let handle = self.handle.get_mut();
    1096              : 
    1097        31952 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1098        31952 :             if slot_guard.tag == tag {
    1099        28486 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1100              :                 // there is also operation "close-by-replace" for closes done on eviction for
    1101              :                 // comparison.
    1102        28486 :                 if let Some(fd) = slot_guard.file.take() {
    1103        28486 :                     STORAGE_IO_TIME_METRIC
    1104        28486 :                         .get(StorageIoOperation::Close)
    1105        28486 :                         .observe_closure_duration(|| drop(fd));
    1106        28486 :                 }
    1107         3466 :             }
    1108        31952 :         }
    1109              : 
    1110              :         // We don't have async drop so we cannot directly await the lock here.
    1111              :         // Instead, first do a best-effort attempt at closing the underlying
    1112              :         // file descriptor by using `try_write`, and if that fails, spawn
    1113              :         // a tokio task to do it asynchronously: we just want it to be
    1114              :         // cleaned up eventually.
    1115              :         // Most of the time, the `try_lock` should succeed though,
    1116              :         // as we have `&mut self` access. In other words, if the slot
    1117              :         // is still occupied by our file, there should be no access from
    1118              :         // other I/O operations; the only other possible place to lock
    1119              :         // the slot is the lock algorithm looking for free slots.
    1120        31952 :         let slot = &get_open_files().slots[handle.index];
    1121        31952 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1122        31948 :             clean_slot(slot, slot_guard, handle.tag);
    1123        31948 :         } else {
    1124            4 :             let tag = handle.tag;
    1125            4 :             tokio::spawn(async move {
    1126            4 :                 let slot_guard = slot.inner.write().await;
    1127            4 :                 clean_slot(slot, slot_guard, tag);
    1128            4 :             });
    1129            4 :         };
    1130        31952 :     }
    1131              : }
    1132              : 
    1133              : impl OwnedAsyncWriter for VirtualFile {
    1134            0 :     async fn write_all_at<Buf: IoBufAligned + Send>(
    1135            0 :         &self,
    1136            0 :         buf: FullSlice<Buf>,
    1137            0 :         offset: u64,
    1138            0 :         ctx: &RequestContext,
    1139            0 :     ) -> (FullSlice<Buf>, std::io::Result<()>) {
    1140            0 :         VirtualFile::write_all_at(self, buf, offset, ctx).await
    1141            0 :     }
    1142            0 :     async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
    1143            0 :         VirtualFile::set_len(self, len, ctx).await
    1144            0 :     }
    1145              : }
    1146              : 
    1147              : impl OpenFiles {
    1148         1452 :     fn new(num_slots: usize) -> OpenFiles {
    1149         1452 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1150        14520 :         for _ in 0..num_slots {
    1151        14520 :             let slot = Slot {
    1152        14520 :                 recently_used: AtomicBool::new(false),
    1153        14520 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1154        14520 :             };
    1155        14520 :             slots.push(slot);
    1156        14520 :         }
    1157              : 
    1158         1452 :         OpenFiles {
    1159         1452 :             next: AtomicUsize::new(0),
    1160         1452 :             slots: Box::leak(slots),
    1161         1452 :         }
    1162         1452 :     }
    1163              : }
    1164              : 
    1165              : ///
    1166              : /// Initialize the virtual file module. This must be called once at page
    1167              : /// server startup.
    1168              : ///
    1169              : #[cfg(not(test))]
    1170            0 : pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode, sync_mode: SyncMode) {
    1171            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1172            0 :         panic!("virtual_file::init called twice");
    1173            0 :     }
    1174            0 :     set_io_mode(mode);
    1175            0 :     io_engine::init(engine);
    1176            0 :     SYNC_MODE.store(sync_mode as u8, std::sync::atomic::Ordering::Relaxed);
    1177            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1178            0 : }
    1179              : 
    1180              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1181              : 
    1182              : // Get a handle to the global slots array.
    1183      3622754 : fn get_open_files() -> &'static OpenFiles {
    1184      3622754 :     //
    1185      3622754 :     // In unit tests, page server startup doesn't happen and no one calls
    1186      3622754 :     // virtual_file::init(). Initialize it here, with a small array.
    1187      3622754 :     //
    1188      3622754 :     // This applies to the virtual file tests below, but all other unit
    1189      3622754 :     // tests too, so the virtual file facility is always usable in
    1190      3622754 :     // unit tests.
    1191      3622754 :     //
    1192      3622754 :     if cfg!(test) {
    1193      3622754 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1194              :     } else {
    1195            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1196              :     }
    1197      3622754 : }
    1198              : 
    1199              : /// Gets the io buffer alignment.
    1200            0 : pub(crate) const fn get_io_buffer_alignment() -> usize {
    1201            0 :     DEFAULT_IO_BUFFER_ALIGNMENT
    1202            0 : }
    1203              : 
    1204              : pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
    1205              : pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
    1206              : pub(crate) type IoPageSlice<'a> =
    1207              :     AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
    1208              : 
    1209         1416 : static IO_MODE: LazyLock<AtomicU8> = LazyLock::new(|| AtomicU8::new(IoMode::preferred() as u8));
    1210              : 
    1211            0 : pub fn set_io_mode(mode: IoMode) {
    1212            0 :     IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
    1213            0 : }
    1214              : 
    1215        29676 : pub(crate) fn get_io_mode() -> IoMode {
    1216        29676 :     IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
    1217        29676 : }
    1218              : 
    1219              : static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
    1220              : 
    1221              : #[cfg(test)]
    1222              : mod tests {
    1223              :     use std::os::unix::fs::FileExt;
    1224              :     use std::sync::Arc;
    1225              : 
    1226              :     use owned_buffers_io::io_buf_ext::IoBufExt;
    1227              :     use owned_buffers_io::slice::SliceMutExt;
    1228              :     use rand::seq::SliceRandom;
    1229              :     use rand::{Rng, thread_rng};
    1230              : 
    1231              :     use super::*;
    1232              :     use crate::context::DownloadBehavior;
    1233              :     use crate::task_mgr::TaskKind;
    1234              : 
    1235              :     enum MaybeVirtualFile {
    1236              :         VirtualFile(VirtualFile),
    1237              :         File(File),
    1238              :     }
    1239              : 
    1240              :     impl From<VirtualFile> for MaybeVirtualFile {
    1241           36 :         fn from(vf: VirtualFile) -> Self {
    1242           36 :             MaybeVirtualFile::VirtualFile(vf)
    1243           36 :         }
    1244              :     }
    1245              : 
    1246              :     impl MaybeVirtualFile {
    1247         4932 :         async fn read_exact_at(
    1248         4932 :             &self,
    1249         4932 :             mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
    1250         4932 :             offset: u64,
    1251         4932 :             ctx: &RequestContext,
    1252         4932 :         ) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
    1253         4932 :             match self {
    1254         2484 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
    1255         2448 :                 MaybeVirtualFile::File(file) => {
    1256         2448 :                     let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
    1257         2448 :                     file.read_exact_at(rust_slice, offset).map(|()| slice)
    1258              :                 }
    1259              :             }
    1260         4932 :         }
    1261           96 :         async fn write_all_at<Buf: IoBufAligned + Send>(
    1262           96 :             &self,
    1263           96 :             buf: FullSlice<Buf>,
    1264           96 :             offset: u64,
    1265           96 :             ctx: &RequestContext,
    1266           96 :         ) -> Result<(), Error> {
    1267           96 :             match self {
    1268           48 :                 MaybeVirtualFile::VirtualFile(file) => {
    1269           48 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1270           48 :                     res
    1271              :                 }
    1272           48 :                 MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
    1273              :             }
    1274           96 :         }
    1275              : 
    1276              :         // Helper function to slurp a portion of a file into a string
    1277         4932 :         async fn read_string_at(
    1278         4932 :             &mut self,
    1279         4932 :             pos: u64,
    1280         4932 :             len: usize,
    1281         4932 :             ctx: &RequestContext,
    1282         4932 :         ) -> Result<String, Error> {
    1283         4932 :             let slice = IoBufferMut::with_capacity(len).slice_full();
    1284         4932 :             assert_eq!(slice.bytes_total(), len);
    1285         4932 :             let slice = self.read_exact_at(slice, pos, ctx).await?;
    1286         4908 :             let buf = slice.into_inner();
    1287         4908 :             assert_eq!(buf.len(), len);
    1288              : 
    1289         4908 :             Ok(String::from_utf8(buf.to_vec()).unwrap())
    1290         4932 :         }
    1291              :     }
    1292              : 
    1293              :     #[tokio::test]
    1294           12 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1295           12 :         // The real work is done in the test_files() helper function. This
    1296           12 :         // allows us to run the same set of tests against a native File, and
    1297           12 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1298           12 :         // but this allows us to verify that the operations return the same
    1299           12 :         // results with VirtualFiles as with native Files. (Except that with
    1300           12 :         // native files, you will run out of file descriptors if the ulimit
    1301           12 :         // is low enough.)
    1302           12 :         struct A;
    1303           12 : 
    1304           12 :         impl Adapter for A {
    1305         1236 :             async fn open(
    1306         1236 :                 path: Utf8PathBuf,
    1307         1236 :                 opts: OpenOptions,
    1308         1236 :                 ctx: &RequestContext,
    1309         1236 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1310         1236 :                 let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
    1311         1236 :                 Ok(MaybeVirtualFile::VirtualFile(vf))
    1312         1236 :             }
    1313           12 :         }
    1314           12 :         test_files::<A>("virtual_files").await
    1315           12 :     }
    1316              : 
    1317              :     #[tokio::test]
    1318           12 :     async fn test_physical_files() -> anyhow::Result<()> {
    1319           12 :         struct B;
    1320           12 : 
    1321           12 :         impl Adapter for B {
    1322         1236 :             async fn open(
    1323         1236 :                 path: Utf8PathBuf,
    1324         1236 :                 opts: OpenOptions,
    1325         1236 :                 _ctx: &RequestContext,
    1326         1236 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1327           12 :                 Ok(MaybeVirtualFile::File({
    1328         1236 :                     let owned_fd = opts.open(path.as_std_path()).await?;
    1329         1236 :                     File::from(owned_fd)
    1330           12 :                 }))
    1331         1236 :             }
    1332           12 :         }
    1333           12 : 
    1334           12 :         test_files::<B>("physical_files").await
    1335           12 :     }
    1336              : 
    1337              :     /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
    1338              :     /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
    1339              :     /// in trait which benefits from the new lifetime capture rules already.
    1340              :     trait Adapter {
    1341              :         async fn open(
    1342              :             path: Utf8PathBuf,
    1343              :             opts: OpenOptions,
    1344              :             ctx: &RequestContext,
    1345              :         ) -> Result<MaybeVirtualFile, anyhow::Error>;
    1346              :     }
    1347              : 
    1348           24 :     async fn test_files<A>(testname: &str) -> anyhow::Result<()>
    1349           24 :     where
    1350           24 :         A: Adapter,
    1351           24 :     {
    1352           24 :         let ctx =
    1353           24 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1354           24 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1355           24 :         std::fs::create_dir_all(&testdir)?;
    1356              : 
    1357           24 :         let path_a = testdir.join("file_a");
    1358           24 :         let mut file_a = A::open(
    1359           24 :             path_a.clone(),
    1360           24 :             OpenOptions::new()
    1361           24 :                 .write(true)
    1362           24 :                 .create(true)
    1363           24 :                 .truncate(true)
    1364           24 :                 .to_owned(),
    1365           24 :             &ctx,
    1366           24 :         )
    1367           24 :         .await?;
    1368              : 
    1369           24 :         file_a
    1370           24 :             .write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
    1371           24 :             .await?;
    1372              : 
    1373              :         // cannot read from a file opened in write-only mode
    1374           24 :         let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
    1375              : 
    1376              :         // Close the file and re-open for reading
    1377           24 :         let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
    1378              : 
    1379              :         // cannot write to a file opened in read-only mode
    1380           24 :         let _ = file_a
    1381           24 :             .write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
    1382           24 :             .await
    1383           24 :             .unwrap_err();
    1384           24 : 
    1385           24 :         // Try simple read
    1386           24 :         assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
    1387              : 
    1388              :         // Create another test file, and try FileExt functions on it.
    1389           24 :         let path_b = testdir.join("file_b");
    1390           24 :         let mut file_b = A::open(
    1391           24 :             path_b.clone(),
    1392           24 :             OpenOptions::new()
    1393           24 :                 .read(true)
    1394           24 :                 .write(true)
    1395           24 :                 .create(true)
    1396           24 :                 .truncate(true)
    1397           24 :                 .to_owned(),
    1398           24 :             &ctx,
    1399           24 :         )
    1400           24 :         .await?;
    1401           24 :         file_b
    1402           24 :             .write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
    1403           24 :             .await?;
    1404           24 :         file_b
    1405           24 :             .write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
    1406           24 :             .await?;
    1407              : 
    1408           24 :         assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
    1409              : 
    1410              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1411              :         // open the same file many times. The effect is the same.)
    1412              : 
    1413           24 :         let mut vfiles = Vec::new();
    1414         2424 :         for _ in 0..100 {
    1415         2400 :             let mut vfile = A::open(
    1416         2400 :                 path_b.clone(),
    1417         2400 :                 OpenOptions::new().read(true).to_owned(),
    1418         2400 :                 &ctx,
    1419         2400 :             )
    1420         2400 :             .await?;
    1421         2400 :             assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
    1422         2400 :             vfiles.push(vfile);
    1423              :         }
    1424              : 
    1425              :         // make sure we opened enough files to definitely cause evictions.
    1426           24 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1427              : 
    1428              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1429              :         // from it again.
    1430           24 :         assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
    1431              : 
    1432              :         // Check that all the other FDs still work too. Use them in random order for
    1433              :         // good measure.
    1434           24 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1435         2400 :         for vfile in vfiles.iter_mut() {
    1436         2400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
    1437              :         }
    1438              : 
    1439           24 :         Ok(())
    1440           24 :     }
    1441              : 
    1442              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1443              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1444              :     /// VirtualFile from multiple threads concurrently.
    1445              :     #[tokio::test]
    1446           12 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1447           12 :         const SIZE: usize = 8 * 1024;
    1448           12 :         const VIRTUAL_FILES: usize = 100;
    1449           12 :         const THREADS: usize = 100;
    1450           12 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1451           12 : 
    1452           12 :         let ctx =
    1453           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1454           12 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1455           12 :         std::fs::create_dir_all(&testdir)?;
    1456           12 : 
    1457           12 :         // Create a test file.
    1458           12 :         let test_file_path = testdir.join("concurrency_test_file");
    1459           12 :         {
    1460           12 :             let file = File::create(&test_file_path)?;
    1461           12 :             file.write_all_at(&SAMPLE, 0)?;
    1462           12 :         }
    1463           12 : 
    1464           12 :         // Open the file many times.
    1465           12 :         let mut files = Vec::new();
    1466         1212 :         for _ in 0..VIRTUAL_FILES {
    1467         1200 :             let f = VirtualFileInner::open_with_options(
    1468         1200 :                 &test_file_path,
    1469         1200 :                 OpenOptions::new().read(true).clone(),
    1470         1200 :                 &ctx,
    1471         1200 :             )
    1472         1200 :             .await?;
    1473         1200 :             files.push(f);
    1474           12 :         }
    1475           12 :         let files = Arc::new(files);
    1476           12 : 
    1477           12 :         // Launch many threads, and use the virtual files concurrently in random order.
    1478           12 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1479           12 :             .worker_threads(THREADS)
    1480           12 :             .thread_name("test_vfile_concurrency thread")
    1481           12 :             .build()
    1482           12 :             .unwrap();
    1483           12 :         let mut hdls = Vec::new();
    1484         1212 :         for _threadno in 0..THREADS {
    1485         1200 :             let files = files.clone();
    1486         1200 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1487         1200 :             let hdl = rt.spawn(async move {
    1488         1200 :                 let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
    1489         1200 :                 let mut rng = rand::rngs::OsRng;
    1490      1200000 :                 for _ in 1..1000 {
    1491      1198800 :                     let f = &files[rng.gen_range(0..files.len())];
    1492      1198800 :                     buf = f
    1493      1198800 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1494      1198800 :                         .await
    1495      1198800 :                         .unwrap()
    1496      1198800 :                         .into_inner();
    1497      1198800 :                     assert!(buf[..] == SAMPLE);
    1498           12 :                 }
    1499         1200 :             });
    1500         1200 :             hdls.push(hdl);
    1501         1200 :         }
    1502         1212 :         for hdl in hdls {
    1503         1200 :             hdl.await?;
    1504           12 :         }
    1505           12 :         std::mem::forget(rt);
    1506           12 : 
    1507           12 :         Ok(())
    1508           12 :     }
    1509              : 
    1510              :     #[tokio::test]
    1511           12 :     async fn test_atomic_overwrite_basic() {
    1512           12 :         let ctx =
    1513           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1514           12 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1515           12 :         std::fs::create_dir_all(&testdir).unwrap();
    1516           12 : 
    1517           12 :         let path = testdir.join("myfile");
    1518           12 :         let tmp_path = testdir.join("myfile.tmp");
    1519           12 : 
    1520           12 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1521           12 :             .await
    1522           12 :             .unwrap();
    1523           12 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1524           12 :         let post = file.read_string_at(0, 3, &ctx).await.unwrap();
    1525           12 :         assert_eq!(post, "foo");
    1526           12 :         assert!(!tmp_path.exists());
    1527           12 :         drop(file);
    1528           12 : 
    1529           12 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1530           12 :             .await
    1531           12 :             .unwrap();
    1532           12 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1533           12 :         let post = file.read_string_at(0, 3, &ctx).await.unwrap();
    1534           12 :         assert_eq!(post, "bar");
    1535           12 :         assert!(!tmp_path.exists());
    1536           12 :         drop(file);
    1537           12 :     }
    1538              : 
    1539              :     #[tokio::test]
    1540           12 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1541           12 :         let ctx =
    1542           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1543           12 :         let testdir =
    1544           12 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1545           12 :         std::fs::create_dir_all(&testdir).unwrap();
    1546           12 : 
    1547           12 :         let path = testdir.join("myfile");
    1548           12 :         let tmp_path = testdir.join("myfile.tmp");
    1549           12 : 
    1550           12 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1551           12 :         assert!(tmp_path.exists());
    1552           12 : 
    1553           12 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1554           12 :             .await
    1555           12 :             .unwrap();
    1556           12 : 
    1557           12 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1558           12 :         let post = file.read_string_at(0, 3, &ctx).await.unwrap();
    1559           12 :         assert_eq!(post, "foo");
    1560           12 :         assert!(!tmp_path.exists());
    1561           12 :         drop(file);
    1562           12 :     }
    1563              : }
        

Generated by: LCOV version 2.1-beta