LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: 53536e7d038dd1afd98124ffab7571882048d4d5.info Lines: 90.4 % 1121 1013
Test Date: 2025-04-24 12:00:37 Functions: 86.5 % 267 231

            Line data    Source code
       1              : //! VirtualFile is like a normal File, but it's not bound directly to
       2              : //! a file descriptor.
       3              : //!
       4              : //! Instead, the file is opened when it's read from,
       5              : //! and if too many files are open globally in the system, least-recently
       6              : //! used ones are closed.
       7              : //!
       8              : //! To track which files have been recently used, we use the clock algorithm
       9              : //! with a 'recently_used' flag on each slot.
      10              : //!
      11              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      12              : //! src/backend/storage/file/fd.c
      13              : //!
      14              : use std::fs::File;
      15              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      16              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      17              : #[cfg(target_os = "linux")]
      18              : use std::os::unix::fs::OpenOptionsExt;
      19              : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
      20              : 
      21              : use camino::{Utf8Path, Utf8PathBuf};
      22              : use once_cell::sync::OnceCell;
      23              : use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
      24              : use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
      25              : use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
      26              : use owned_buffers_io::io_buf_ext::FullSlice;
      27              : use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
      28              : pub use pageserver_api::models::virtual_file as api;
      29              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      30              : use tokio::time::Instant;
      31              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      32              : 
      33              : use crate::assert_u64_eq_usize::UsizeIsU64;
      34              : use crate::context::RequestContext;
      35              : use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
      36              : use crate::page_cache::{PAGE_SZ, PageWriteGuard};
      37              : pub(crate) mod io_engine;
      38              : pub use io_engine::{
      39              :     FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
      40              :     io_engine_for_bench,
      41              : };
      42              : mod metadata;
      43              : mod open_options;
      44              : pub(crate) use api::IoMode;
      45              : pub(crate) use io_engine::IoEngineKind;
      46              : pub(crate) use metadata::Metadata;
      47              : pub(crate) use open_options::*;
      48              : 
      49              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      50              : 
      51              : pub(crate) mod owned_buffers_io {
      52              :     //! Abstractions for IO with owned buffers.
      53              :     //!
      54              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      55              :     //! reason we need this abstraction.
      56              :     //!
      57              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      58              :     //! but for the time being we're proving out the primitives in the neon.git repo
      59              :     //! for faster iteration.
      60              : 
      61              :     pub(crate) mod aligned_buffer;
      62              :     pub(crate) mod io_buf_aligned;
      63              :     pub(crate) mod io_buf_ext;
      64              :     pub(crate) mod slice;
      65              :     pub(crate) mod write;
      66              : }
      67              : 
      68              : #[derive(Debug)]
      69              : pub struct VirtualFile {
      70              :     inner: VirtualFileInner,
      71              :     _mode: IoMode,
      72              : }
      73              : 
      74              : impl VirtualFile {
      75              :     /// Open a file in read-only mode. Like File::open.
      76         6336 :     pub async fn open<P: AsRef<Utf8Path>>(
      77         6336 :         path: P,
      78         6336 :         ctx: &RequestContext,
      79         6336 :     ) -> Result<Self, std::io::Error> {
      80         6336 :         let inner = VirtualFileInner::open(path, ctx).await?;
      81         6336 :         Ok(VirtualFile {
      82         6336 :             inner,
      83         6336 :             _mode: IoMode::Buffered,
      84         6336 :         })
      85         6336 :     }
      86              : 
      87              :     /// Open a file in read-only mode. Like File::open.
      88              :     ///
      89              :     /// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
      90         7512 :     pub async fn open_v2<P: AsRef<Utf8Path>>(
      91         7512 :         path: P,
      92         7512 :         ctx: &RequestContext,
      93         7512 :     ) -> Result<Self, std::io::Error> {
      94         7512 :         Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
      95         7512 :     }
      96              : 
      97         9138 :     pub async fn create<P: AsRef<Utf8Path>>(
      98         9138 :         path: P,
      99         9138 :         ctx: &RequestContext,
     100         9138 :     ) -> Result<Self, std::io::Error> {
     101         9138 :         let inner = VirtualFileInner::create(path, ctx).await?;
     102         9138 :         Ok(VirtualFile {
     103         9138 :             inner,
     104         9138 :             _mode: IoMode::Buffered,
     105         9138 :         })
     106         9138 :     }
     107              : 
     108            0 :     pub async fn create_v2<P: AsRef<Utf8Path>>(
     109            0 :         path: P,
     110            0 :         ctx: &RequestContext,
     111            0 :     ) -> Result<Self, std::io::Error> {
     112            0 :         VirtualFile::open_with_options_v2(
     113            0 :             path.as_ref(),
     114            0 :             OpenOptions::new().write(true).create(true).truncate(true),
     115            0 :             ctx,
     116            0 :         )
     117            0 :         .await
     118            0 :     }
     119              : 
     120         4980 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     121         4980 :         path: P,
     122         4980 :         open_options: &OpenOptions,
     123         4980 :         ctx: &RequestContext,
     124         4980 :     ) -> Result<Self, std::io::Error> {
     125         4980 :         let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
     126         4980 :         Ok(VirtualFile {
     127         4980 :             inner,
     128         4980 :             _mode: IoMode::Buffered,
     129         4980 :         })
     130         4980 :     }
     131              : 
     132        15468 :     pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
     133        15468 :         path: P,
     134        15468 :         open_options: &OpenOptions,
     135        15468 :         ctx: &RequestContext,
     136        15468 :     ) -> Result<Self, std::io::Error> {
     137        15468 :         let file = match get_io_mode() {
     138              :             IoMode::Buffered => {
     139        15468 :                 let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
     140        15468 :                 VirtualFile {
     141        15468 :                     inner,
     142        15468 :                     _mode: IoMode::Buffered,
     143        15468 :                 }
     144              :             }
     145              :             #[cfg(target_os = "linux")]
     146              :             IoMode::Direct => {
     147            0 :                 let inner = VirtualFileInner::open_with_options(
     148            0 :                     path,
     149            0 :                     open_options.clone().custom_flags(nix::libc::O_DIRECT),
     150            0 :                     ctx,
     151            0 :                 )
     152            0 :                 .await?;
     153            0 :                 VirtualFile {
     154            0 :                     inner,
     155            0 :                     _mode: IoMode::Direct,
     156            0 :                 }
     157              :             }
     158              :         };
     159        15468 :         Ok(file)
     160        15468 :     }
     161              : 
     162         7200 :     pub fn path(&self) -> &Utf8Path {
     163         7200 :         self.inner.path.as_path()
     164         7200 :     }
     165              : 
     166          132 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     167          132 :         final_path: Utf8PathBuf,
     168          132 :         tmp_path: Utf8PathBuf,
     169          132 :         content: B,
     170          132 :     ) -> std::io::Result<()> {
     171          132 :         VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
     172          132 :     }
     173              : 
     174        17082 :     pub async fn sync_all(&self) -> Result<(), Error> {
     175        17082 :         if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
     176            0 :             return Ok(());
     177        17082 :         }
     178        17082 :         self.inner.sync_all().await
     179        17082 :     }
     180              : 
     181            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     182            0 :         if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
     183            0 :             return Ok(());
     184            0 :         }
     185            0 :         self.inner.sync_data().await
     186            0 :     }
     187              : 
     188        10992 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     189        10992 :         self.inner.metadata().await
     190        10992 :     }
     191              : 
     192         1620 :     pub fn remove(self) {
     193         1620 :         self.inner.remove();
     194         1620 :     }
     195              : 
     196        34668 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     197        34668 :         self.inner.seek(pos).await
     198        34668 :     }
     199              : 
     200      1839843 :     pub async fn read_exact_at<Buf>(
     201      1839843 :         &self,
     202      1839843 :         slice: Slice<Buf>,
     203      1839843 :         offset: u64,
     204      1839843 :         ctx: &RequestContext,
     205      1839843 :     ) -> Result<Slice<Buf>, Error>
     206      1839843 :     where
     207      1839843 :         Buf: IoBufAlignedMut + Send,
     208      1839843 :     {
     209      1839843 :         self.inner.read_exact_at(slice, offset, ctx).await
     210      1839843 :     }
     211              : 
     212       185697 :     pub async fn read_exact_at_page(
     213       185697 :         &self,
     214       185697 :         page: PageWriteGuard<'static>,
     215       185697 :         offset: u64,
     216       185697 :         ctx: &RequestContext,
     217       185697 :     ) -> Result<PageWriteGuard<'static>, Error> {
     218       185697 :         self.inner.read_exact_at_page(page, offset, ctx).await
     219       185697 :     }
     220              : 
     221        39906 :     pub async fn write_all_at<Buf: IoBufAligned + Send>(
     222        39906 :         &self,
     223        39906 :         buf: FullSlice<Buf>,
     224        39906 :         offset: u64,
     225        39906 :         ctx: &RequestContext,
     226        39906 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     227        39906 :         self.inner.write_all_at(buf, offset, ctx).await
     228        39906 :     }
     229              : 
     230       788927 :     pub async fn write_all<Buf: IoBuf + Send>(
     231       788927 :         &mut self,
     232       788927 :         buf: FullSlice<Buf>,
     233       788927 :         ctx: &RequestContext,
     234       788927 :     ) -> (FullSlice<Buf>, Result<usize, Error>) {
     235       788927 :         self.inner.write_all(buf, ctx).await
     236       788927 :     }
     237              : 
     238         1344 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
     239         1344 :         self.inner.read_to_end(buf, ctx).await
     240         1344 :     }
     241              : 
     242            0 :     pub(crate) async fn read_to_string(
     243            0 :         &mut self,
     244            0 :         ctx: &RequestContext,
     245            0 :     ) -> Result<String, anyhow::Error> {
     246            0 :         let mut buf = Vec::new();
     247            0 :         self.read_to_end(&mut buf, ctx).await?;
     248            0 :         Ok(String::from_utf8(buf)?)
     249            0 :     }
     250              : }
     251              : 
     252              : /// Indicates whether to enable fsync, fdatasync, or O_SYNC/O_DSYNC when writing
     253              : /// files. Switching this off is unsafe and only used for testing on machines
     254              : /// with slow drives.
     255              : #[repr(u8)]
     256              : pub enum SyncMode {
     257              :     Sync,
     258              :     UnsafeNoSync,
     259              : }
     260              : 
     261              : impl TryFrom<u8> for SyncMode {
     262              :     type Error = u8;
     263              : 
     264            0 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
     265            0 :         Ok(match value {
     266            0 :             v if v == (SyncMode::Sync as u8) => SyncMode::Sync,
     267            0 :             v if v == (SyncMode::UnsafeNoSync as u8) => SyncMode::UnsafeNoSync,
     268            0 :             x => return Err(x),
     269              :         })
     270            0 :     }
     271              : }
     272              : 
     273              : ///
     274              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
     275              : /// the underlying file is closed if the system is low on file descriptors,
     276              : /// and re-opened when it's accessed again.
     277              : ///
     278              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
     279              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
     280              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
     281              : /// require a mutable reference, because they modify the "current position".
     282              : ///
     283              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
     284              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
     285              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
     286              : /// 'tag' field is used to detect whether the handle still is valid or not.
     287              : ///
     288              : #[derive(Debug)]
     289              : pub struct VirtualFileInner {
     290              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
     291              :     /// might contain our File, or it may be empty, or it may contain a File that
     292              :     /// belongs to a different VirtualFile.
     293              :     handle: RwLock<SlotHandle>,
     294              : 
     295              :     /// Current file position
     296              :     pos: u64,
     297              : 
     298              :     /// File path and options to use to open it.
     299              :     ///
     300              :     /// Note: this only contains the options needed to re-open it. For example,
     301              :     /// if a new file is created, we only pass the create flag when it's initially
     302              :     /// opened, in the VirtualFile::create() function, and strip the flag before
     303              :     /// storing it here.
     304              :     pub path: Utf8PathBuf,
     305              :     open_options: OpenOptions,
     306              : }
     307              : 
     308              : #[derive(Debug, PartialEq, Clone, Copy)]
     309              : struct SlotHandle {
     310              :     /// Index into OPEN_FILES.slots
     311              :     index: usize,
     312              : 
     313              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     314              :     /// been recycled and no longer contains the FD for this virtual file.
     315              :     tag: u64,
     316              : }
     317              : 
     318              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     319              : /// are currently open. Each slot in the array is protected by a separate lock,
     320              : /// so that different files can be accessed independently. The lock must be held
     321              : /// in write mode to replace the slot with a different file, but a read mode
     322              : /// is enough to operate on the file, whether you're reading or writing to it.
     323              : ///
     324              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     325              : /// the virtual_file::init() function. It must be called exactly once at page
     326              : /// server startup.
     327              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     328              : 
     329              : struct OpenFiles {
     330              :     slots: &'static [Slot],
     331              : 
     332              :     /// clock arm for the clock algorithm
     333              :     next: AtomicUsize,
     334              : }
     335              : 
     336              : struct Slot {
     337              :     inner: RwLock<SlotInner>,
     338              : 
     339              :     /// has this file been used since last clock sweep?
     340              :     recently_used: AtomicBool,
     341              : }
     342              : 
     343              : struct SlotInner {
     344              :     /// Counter that's incremented every time a different file is stored here.
     345              :     /// To avoid the ABA problem.
     346              :     tag: u64,
     347              : 
     348              :     /// the underlying file
     349              :     file: Option<OwnedFd>,
     350              : }
     351              : 
     352              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     353              : struct PageWriteGuardBuf {
     354              :     page: PageWriteGuard<'static>,
     355              : }
     356              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     357              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     358              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     359              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     360              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     361       742895 :     fn stable_ptr(&self) -> *const u8 {
     362       742895 :         self.page.as_ptr()
     363       742895 :     }
     364      1392888 :     fn bytes_init(&self) -> usize {
     365      1392888 :         self.page.len()
     366      1392888 :     }
     367       557091 :     fn bytes_total(&self) -> usize {
     368       557091 :         self.page.len()
     369       557091 :     }
     370              : }
     371              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     372              : // hence it's safe to hand out the `stable_mut_ptr()`.
     373              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     374       278599 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     375       278599 :         self.page.as_mut_ptr()
     376       278599 :     }
     377              : 
     378       185697 :     unsafe fn set_init(&mut self, pos: usize) {
     379       185697 :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     380       185697 :         assert!(pos <= self.page.len());
     381       185697 :     }
     382              : }
     383              : 
     384              : impl OpenFiles {
     385              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     386              :     ///
     387              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     388              :     /// recently_used has been set. It's all ready for reuse.
     389      1177190 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     390      1177190 :         //
     391      1177190 :         // Run the clock algorithm to find a slot to replace.
     392      1177190 :         //
     393      1177190 :         let num_slots = self.slots.len();
     394      1177190 :         let mut retries = 0;
     395              :         let mut slot;
     396              :         let mut slot_guard;
     397              :         let index;
     398              :         loop {
     399     15544028 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     400     15544028 :             slot = &self.slots[next];
     401     15544028 : 
     402     15544028 :             // If the recently_used flag on this slot is set, continue the clock
     403     15544028 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     404     15544028 :             // lock, also continue the clock sweep.
     405     15544028 :             //
     406     15544028 :             // We only continue in this manner for a while, though. If we loop
     407     15544028 :             // through the array twice without finding a victim, just pick the
     408     15544028 :             // next slot and wait until we can reuse it. This way, we avoid
     409     15544028 :             // spinning in the extreme case that all the slots are busy with an
     410     15544028 :             // I/O operation.
     411     15544028 :             if retries < num_slots * 2 {
     412     14935022 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     413     13683011 :                     if let Ok(guard) = slot.inner.try_write() {
     414       568184 :                         slot_guard = guard;
     415       568184 :                         index = next;
     416       568184 :                         break;
     417     13114827 :                     }
     418      1252011 :                 }
     419     14366838 :                 retries += 1;
     420              :             } else {
     421       609006 :                 slot_guard = slot.inner.write().await;
     422       609006 :                 index = next;
     423       609006 :                 break;
     424              :             }
     425              :         }
     426              : 
     427              :         //
     428              :         // We now have the victim slot locked. If it was in use previously, close the
     429              :         // old file.
     430              :         //
     431      1177190 :         if let Some(old_file) = slot_guard.file.take() {
     432      1146732 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     433      1146732 :             // distinguish the two.
     434      1146732 :             STORAGE_IO_TIME_METRIC
     435      1146732 :                 .get(StorageIoOperation::CloseByReplace)
     436      1146732 :                 .observe_closure_duration(|| drop(old_file));
     437      1146732 :         }
     438              : 
     439              :         // Prepare the slot for reuse and return it
     440      1177190 :         slot_guard.tag += 1;
     441      1177190 :         slot.recently_used.store(true, Ordering::Relaxed);
     442      1177190 :         (
     443      1177190 :             SlotHandle {
     444      1177190 :                 index,
     445      1177190 :                 tag: slot_guard.tag,
     446      1177190 :             },
     447      1177190 :             slot_guard,
     448      1177190 :         )
     449      1177190 :     }
     450              : }
     451              : 
     452              : /// Identify error types that should alwways terminate the process.  Other
     453              : /// error types may be elegible for retry.
     454           24 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     455              :     use nix::errno::Errno::*;
     456           24 :     match e.raw_os_error().map(nix::errno::from_i32) {
     457              :         Some(EIO) => {
     458              :             // Terminate on EIO because we no longer trust the device to store
     459              :             // data safely, or to uphold persistence guarantees on fsync.
     460            0 :             true
     461              :         }
     462              :         Some(EROFS) => {
     463              :             // Terminate on EROFS because a filesystem is usually remounted
     464              :             // readonly when it has experienced some critical issue, so the same
     465              :             // logic as EIO applies.
     466            0 :             true
     467              :         }
     468              :         Some(EACCES) => {
     469              :             // Terminate on EACCESS because we should always have permissions
     470              :             // for our own data dir: if we don't, then we can't do our job and
     471              :             // need administrative intervention to fix permissions.  Terminating
     472              :             // is the best way to make sure we stop cleanly rather than going
     473              :             // into infinite retry loops, and will make it clear to the outside
     474              :             // world that we need help.
     475            0 :             true
     476              :         }
     477              :         _ => {
     478              :             // Treat all other local file I/O errors are retryable.  This includes:
     479              :             // - ENOSPC: we stay up and wait for eviction to free some space
     480              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     481              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     482           24 :             false
     483              :         }
     484              :     }
     485           24 : }
     486              : 
     487              : /// Call this when the local filesystem gives us an error with an external
     488              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     489              : /// bad storage or bad configuration, and we can't fix that from inside
     490              : /// a running process.
     491            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     492            0 :     let backtrace = std::backtrace::Backtrace::force_capture();
     493            0 :     tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
     494            0 :     std::process::abort();
     495              : }
     496              : 
     497              : pub(crate) trait MaybeFatalIo<T> {
     498              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     499              :     fn fatal_err(self, context: &str) -> T;
     500              : }
     501              : 
     502              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     503              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     504              :     ///
     505              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     506              :     /// not on ENOSPC.
     507      7602071 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     508      7602071 :         if let Err(e) = &self {
     509           24 :             if is_fatal_io_error(e) {
     510            0 :                 on_fatal_io_error(e, context);
     511           24 :             }
     512      7602047 :         }
     513      7602071 :         self
     514      7602071 :     }
     515              : 
     516              :     /// Terminate the process on any I/O error.
     517              :     ///
     518              :     /// This is appropriate for reads on files that we know exist: they should always work.
     519        12348 :     fn fatal_err(self, context: &str) -> T {
     520        12348 :         match self {
     521        12348 :             Ok(v) => v,
     522            0 :             Err(e) => {
     523            0 :                 on_fatal_io_error(&e, context);
     524              :             }
     525              :         }
     526        12348 :     }
     527              : }
     528              : 
     529              : /// Observe duration for the given storage I/O operation
     530              : ///
     531              : /// Unlike `observe_closure_duration`, this supports async,
     532              : /// where "support" means that we measure wall clock time.
     533              : macro_rules! observe_duration {
     534              :     ($op:expr, $($body:tt)*) => {{
     535              :         let instant = Instant::now();
     536              :         let result = $($body)*;
     537              :         let elapsed = instant.elapsed().as_secs_f64();
     538              :         STORAGE_IO_TIME_METRIC
     539              :             .get($op)
     540              :             .observe(elapsed);
     541              :         result
     542              :     }}
     543              : }
     544              : 
     545              : macro_rules! with_file {
     546              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     547              :         let $ident = $this.lock_file().await?;
     548              :         observe_duration!($op, $($body)*)
     549              :     }};
     550              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     551              :         let mut $ident = $this.lock_file().await?;
     552              :         observe_duration!($op, $($body)*)
     553              :     }};
     554              : }
     555              : 
     556              : impl VirtualFileInner {
     557              :     /// Open a file in read-only mode. Like File::open.
     558         6336 :     pub async fn open<P: AsRef<Utf8Path>>(
     559         6336 :         path: P,
     560         6336 :         ctx: &RequestContext,
     561         6336 :     ) -> Result<VirtualFileInner, std::io::Error> {
     562         6336 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
     563         6336 :     }
     564              : 
     565              :     /// Create a new file for writing. If the file exists, it will be truncated.
     566              :     /// Like File::create.
     567         9138 :     pub async fn create<P: AsRef<Utf8Path>>(
     568         9138 :         path: P,
     569         9138 :         ctx: &RequestContext,
     570         9138 :     ) -> Result<VirtualFileInner, std::io::Error> {
     571         9138 :         Self::open_with_options(
     572         9138 :             path.as_ref(),
     573         9138 :             OpenOptions::new().write(true).create(true).truncate(true),
     574         9138 :             ctx,
     575         9138 :         )
     576         9138 :         .await
     577         9138 :     }
     578              : 
     579              :     /// Open a file with given options.
     580              :     ///
     581              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     582              :     /// they will be applied also when the file is subsequently re-opened, not only
     583              :     /// on the first time. Make sure that's sane!
     584        37122 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     585        37122 :         path: P,
     586        37122 :         open_options: &OpenOptions,
     587        37122 :         _ctx: &RequestContext,
     588        37122 :     ) -> Result<VirtualFileInner, std::io::Error> {
     589        37122 :         let path = path.as_ref();
     590        37122 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     591              : 
     592              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     593              :         // where our caller doesn't get to use the returned VirtualFile before its
     594              :         // slot gets re-used by someone else.
     595        37122 :         let file = observe_duration!(StorageIoOperation::Open, {
     596        37122 :             open_options.open(path.as_std_path()).await?
     597              :         });
     598              : 
     599              :         // Strip all options other than read and write.
     600              :         //
     601              :         // It would perhaps be nicer to check just for the read and write flags
     602              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     603              :         // only to set them.
     604        37122 :         let mut reopen_options = open_options.clone();
     605        37122 :         reopen_options.create(false);
     606        37122 :         reopen_options.create_new(false);
     607        37122 :         reopen_options.truncate(false);
     608        37122 : 
     609        37122 :         let vfile = VirtualFileInner {
     610        37122 :             handle: RwLock::new(handle),
     611        37122 :             pos: 0,
     612        37122 :             path: path.to_owned(),
     613        37122 :             open_options: reopen_options,
     614        37122 :         };
     615        37122 : 
     616        37122 :         // TODO: Under pressure, it's likely the slot will get re-used and
     617        37122 :         // the underlying file closed before they get around to using it.
     618        37122 :         // => https://github.com/neondatabase/neon/issues/6065
     619        37122 :         slot_guard.file.replace(file);
     620        37122 : 
     621        37122 :         Ok(vfile)
     622        37122 :     }
     623              : 
     624              :     /// Async version of [`::utils::crashsafe::overwrite`].
     625              :     ///
     626              :     /// # NB:
     627              :     ///
     628              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     629              :     /// it did at an earlier time.
     630              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     631          168 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     632          168 :         final_path: Utf8PathBuf,
     633          168 :         tmp_path: Utf8PathBuf,
     634          168 :         content: B,
     635          168 :     ) -> std::io::Result<()> {
     636          168 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     637          168 :         // See https://github.com/neondatabase/neon/issues/6663
     638          168 : 
     639          168 :         tokio::task::spawn_blocking(move || {
     640          168 :             let slice_storage;
     641          168 :             let content_len = content.bytes_init();
     642          168 :             let content = if content.bytes_init() > 0 {
     643          168 :                 slice_storage = Some(content.slice(0..content_len));
     644          168 :                 slice_storage.as_deref().expect("just set it to Some()")
     645              :             } else {
     646            0 :                 &[]
     647              :             };
     648          168 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     649          168 :                 .maybe_fatal_err("crashsafe_overwrite")
     650          168 :         })
     651          168 :         .await
     652          168 :         .expect("blocking task is never aborted")
     653          168 :     }
     654              : 
     655              :     /// Call File::sync_all() on the underlying File.
     656        17082 :     pub async fn sync_all(&self) -> Result<(), Error> {
     657        17082 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     658        17082 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     659        17082 :             res.maybe_fatal_err("sync_all")
     660              :         })
     661        17082 :     }
     662              : 
     663              :     /// Call File::sync_data() on the underlying File.
     664            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     665            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     666            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     667            0 :             res.maybe_fatal_err("sync_data")
     668              :         })
     669            0 :     }
     670              : 
     671        10992 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     672        10992 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     673        10992 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     674        10992 :             res
     675              :         })
     676        10992 :     }
     677              : 
     678              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     679              :     /// opens it and evicts some other File if necessary. The passed parameter is
     680              :     /// assumed to be a function available for the physical `File`.
     681              :     ///
     682              :     /// We are doing it via a macro as Rust doesn't support async closures that
     683              :     /// take on parameters with lifetimes.
     684      4209203 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     685      4209203 :         let open_files = get_open_files();
     686              : 
     687      1140068 :         let mut handle_guard = {
     688              :             // Read the cached slot handle, and see if the slot that it points to still
     689              :             // contains our File.
     690              :             //
     691              :             // We only need to hold the handle lock while we read the current handle. If
     692              :             // another thread closes the file and recycles the slot for a different file,
     693              :             // we will notice that the handle we read is no longer valid and retry.
     694      4209203 :             let mut handle = *self.handle.read().await;
     695              :             loop {
     696              :                 // Check if the slot contains our File
     697              :                 {
     698      4834784 :                     let slot = &open_files.slots[handle.index];
     699      4834784 :                     let slot_guard = slot.inner.read().await;
     700      4834784 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     701              :                         // Found a cached file descriptor.
     702      3069135 :                         slot.recently_used.store(true, Ordering::Relaxed);
     703      3069135 :                         return Ok(FileGuard { slot_guard });
     704      1765649 :                     }
     705              :                 }
     706              : 
     707              :                 // The slot didn't contain our File. We will have to open it ourselves,
     708              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     709              :                 // that no other thread will try to concurrently open the same file.
     710      1765649 :                 let handle_guard = self.handle.write().await;
     711              : 
     712              :                 // If another thread changed the handle while we were not holding the lock,
     713              :                 // then the handle might now be valid again. Loop back to retry.
     714      1765649 :                 if *handle_guard != handle {
     715       625581 :                     handle = *handle_guard;
     716       625581 :                     continue;
     717      1140068 :                 }
     718      1140068 :                 break handle_guard;
     719              :             }
     720              :         };
     721              : 
     722              :         // We need to open the file ourselves. The handle in the VirtualFile is
     723              :         // now locked in write-mode. Find a free slot to put it in.
     724      1140068 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     725              : 
     726              :         // Re-open the physical file.
     727              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     728              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     729              :         // of the virtual file descriptor cache.
     730      1140068 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     731      1140068 :             self.open_options.open(self.path.as_std_path()).await?
     732              :         });
     733              : 
     734              :         // Store the File in the slot and update the handle in the VirtualFile
     735              :         // to point to it.
     736      1140068 :         slot_guard.file.replace(file);
     737      1140068 : 
     738      1140068 :         *handle_guard = handle;
     739      1140068 : 
     740      1140068 :         Ok(FileGuard {
     741      1140068 :             slot_guard: slot_guard.downgrade(),
     742      1140068 :         })
     743      4209203 :     }
     744              : 
     745         1620 :     pub fn remove(self) {
     746         1620 :         let path = self.path.clone();
     747         1620 :         drop(self);
     748         1620 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     749         1620 :     }
     750              : 
     751        34668 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     752        34668 :         match pos {
     753        34608 :             SeekFrom::Start(offset) => {
     754        34608 :                 self.pos = offset;
     755        34608 :             }
     756           24 :             SeekFrom::End(offset) => {
     757           24 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     758           24 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     759              :             }
     760           36 :             SeekFrom::Current(offset) => {
     761           36 :                 let pos = self.pos as i128 + offset as i128;
     762           36 :                 if pos < 0 {
     763           12 :                     return Err(Error::new(
     764           12 :                         ErrorKind::InvalidInput,
     765           12 :                         "offset would be negative",
     766           12 :                     ));
     767           24 :                 }
     768           24 :                 if pos > u64::MAX as i128 {
     769            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     770           24 :                 }
     771           24 :                 self.pos = pos as u64;
     772              :             }
     773              :         }
     774        34644 :         Ok(self.pos)
     775        34668 :     }
     776              : 
     777              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     778              :     ///
     779              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     780      3349836 :     pub async fn read_exact_at<Buf>(
     781      3349836 :         &self,
     782      3349836 :         slice: Slice<Buf>,
     783      3349836 :         offset: u64,
     784      3349836 :         ctx: &RequestContext,
     785      3349836 :     ) -> Result<Slice<Buf>, Error>
     786      3349836 :     where
     787      3349836 :         Buf: IoBufAlignedMut + Send,
     788      3349836 :     {
     789      3349836 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     790      3349836 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     791              :         } else {
     792            0 :             None
     793              :         };
     794              : 
     795      3349836 :         let original_bounds = slice.bounds();
     796      3349836 :         let (buf, res) =
     797      3349836 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     798      3349836 :         let res = res.map(|_| buf.slice(original_bounds));
     799              : 
     800      3349836 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     801      3349836 :             if let Ok(slice) = &res {
     802      3349836 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     803      3349836 :                 assert_eq!(original_bounds, returned_bounds);
     804            0 :             }
     805            0 :         }
     806              : 
     807      3349836 :         res
     808      3349836 :     }
     809              : 
     810              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     811       185697 :     pub async fn read_exact_at_page(
     812       185697 :         &self,
     813       185697 :         page: PageWriteGuard<'static>,
     814       185697 :         offset: u64,
     815       185697 :         ctx: &RequestContext,
     816       185697 :     ) -> Result<PageWriteGuard<'static>, Error> {
     817       185697 :         let buf = PageWriteGuardBuf { page }.slice_full();
     818       185697 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     819       185697 :         self.read_exact_at(buf, offset, ctx)
     820       185697 :             .await
     821       185697 :             .map(|slice| slice.into_inner().page)
     822       185697 :     }
     823              : 
     824              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     825        39906 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     826        39906 :         &self,
     827        39906 :         buf: FullSlice<Buf>,
     828        39906 :         mut offset: u64,
     829        39906 :         ctx: &RequestContext,
     830        39906 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     831        39906 :         let buf = buf.into_raw_slice();
     832        39906 :         let bounds = buf.bounds();
     833        39906 :         let restore =
     834        39906 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     835        39906 :         let mut buf = buf;
     836        79812 :         while !buf.is_empty() {
     837        39906 :             let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
     838        39906 :             buf = tmp.into_raw_slice();
     839            0 :             match res {
     840              :                 Ok(0) => {
     841            0 :                     return (
     842            0 :                         restore(buf),
     843            0 :                         Err(Error::new(
     844            0 :                             std::io::ErrorKind::WriteZero,
     845            0 :                             "failed to write whole buffer",
     846            0 :                         )),
     847            0 :                     );
     848              :                 }
     849        39906 :                 Ok(n) => {
     850        39906 :                     buf = buf.slice(n..);
     851        39906 :                     offset += n as u64;
     852        39906 :                 }
     853            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     854            0 :                 Err(e) => return (restore(buf), Err(e)),
     855              :             }
     856              :         }
     857        39906 :         (restore(buf), Ok(()))
     858        39906 :     }
     859              : 
     860              :     /// Writes `buf` to the file at the current offset.
     861              :     ///
     862              :     /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
     863       788927 :     pub async fn write_all<Buf: IoBuf + Send>(
     864       788927 :         &mut self,
     865       788927 :         buf: FullSlice<Buf>,
     866       788927 :         ctx: &RequestContext,
     867       788927 :     ) -> (FullSlice<Buf>, Result<usize, Error>) {
     868       788927 :         let buf = buf.into_raw_slice();
     869       788927 :         let bounds = buf.bounds();
     870       788927 :         let restore =
     871       788927 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     872       788927 :         let nbytes = buf.len();
     873       788927 :         let mut buf = buf;
     874      1577614 :         while !buf.is_empty() {
     875       788699 :             let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
     876       788699 :             buf = tmp.into_raw_slice();
     877           12 :             match res {
     878              :                 Ok(0) => {
     879            0 :                     return (
     880            0 :                         restore(buf),
     881            0 :                         Err(Error::new(
     882            0 :                             std::io::ErrorKind::WriteZero,
     883            0 :                             "failed to write whole buffer",
     884            0 :                         )),
     885            0 :                     );
     886              :                 }
     887       788687 :                 Ok(n) => {
     888       788687 :                     buf = buf.slice(n..);
     889       788687 :                 }
     890           12 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     891           12 :                 Err(e) => return (restore(buf), Err(e)),
     892              :             }
     893              :         }
     894       788915 :         (restore(buf), Ok(nbytes))
     895       788927 :     }
     896              : 
     897       788699 :     async fn write<B: IoBuf + Send>(
     898       788699 :         &mut self,
     899       788699 :         buf: FullSlice<B>,
     900       788699 :         ctx: &RequestContext,
     901       788699 :     ) -> (FullSlice<B>, Result<usize, std::io::Error>) {
     902       788699 :         let pos = self.pos;
     903       788699 :         let (buf, res) = self.write_at(buf, pos, ctx).await;
     904       788699 :         let n = match res {
     905       788687 :             Ok(n) => n,
     906           12 :             Err(e) => return (buf, Err(e)),
     907              :         };
     908       788687 :         self.pos += n as u64;
     909       788687 :         (buf, Ok(n))
     910       788699 :     }
     911              : 
     912      3352500 :     pub(crate) async fn read_at<Buf>(
     913      3352500 :         &self,
     914      3352500 :         buf: tokio_epoll_uring::Slice<Buf>,
     915      3352500 :         offset: u64,
     916      3352500 :         ctx: &RequestContext,
     917      3352500 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     918      3352500 :     where
     919      3352500 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     920      3352500 :     {
     921      3352500 :         let file_guard = match self
     922      3352500 :             .lock_file()
     923      3352500 :             .await
     924      3352500 :             .maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
     925              :         {
     926      3352500 :             Ok(file_guard) => file_guard,
     927            0 :             Err(e) => return (buf, Err(e)),
     928              :         };
     929              : 
     930      3352500 :         observe_duration!(StorageIoOperation::Read, {
     931      3352500 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     932      3352500 :             let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
     933      3352500 :             if let Ok(size) = res {
     934      3352488 :                 ctx.io_size_metrics().read.add(size.into_u64());
     935      3352488 :             }
     936      3352500 :             (buf, res)
     937              :         })
     938      3352500 :     }
     939              : 
     940              :     /// The function aborts the process if the error is fatal.
     941       828605 :     async fn write_at<B: IoBuf + Send>(
     942       828605 :         &self,
     943       828605 :         buf: FullSlice<B>,
     944       828605 :         offset: u64,
     945       828605 :         ctx: &RequestContext,
     946       828605 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     947       828605 :         let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
     948       828605 :         let result = result.maybe_fatal_err("write_at");
     949       828605 :         (slice, result)
     950       828605 :     }
     951              : 
     952       828605 :     async fn write_at_inner<B: IoBuf + Send>(
     953       828605 :         &self,
     954       828605 :         buf: FullSlice<B>,
     955       828605 :         offset: u64,
     956       828605 :         ctx: &RequestContext,
     957       828605 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     958       828605 :         let file_guard = match self.lock_file().await {
     959       828605 :             Ok(file_guard) => file_guard,
     960            0 :             Err(e) => return (buf, Err(e)),
     961              :         };
     962       828605 :         observe_duration!(StorageIoOperation::Write, {
     963       828605 :             let ((_file_guard, buf), result) =
     964       828605 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     965       828605 :             if let Ok(size) = result {
     966       828593 :                 ctx.io_size_metrics().write.add(size.into_u64());
     967       828593 :             }
     968       828605 :             (buf, result)
     969              :         })
     970       828605 :     }
     971              : 
     972         1344 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
     973         1344 :         let mut tmp = vec![0; 128];
     974              :         loop {
     975         2664 :             let slice = tmp.slice(..128);
     976         2664 :             let (slice, res) = self.read_at(slice, self.pos, ctx).await;
     977           12 :             match res {
     978         1332 :                 Ok(0) => return Ok(()),
     979         1320 :                 Ok(n) => {
     980         1320 :                     self.pos += n as u64;
     981         1320 :                     buf.extend_from_slice(&slice[..n]);
     982         1320 :                 }
     983           12 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     984           12 :                 Err(e) => return Err(e),
     985              :             }
     986         1320 :             tmp = slice.into_inner();
     987              :         }
     988         1344 :     }
     989              : }
     990              : 
     991              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     992      3349884 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     993      3349884 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     994      3349884 :     mut offset: u64,
     995      3349884 :     mut read_at: F,
     996      3349884 : ) -> (Buf, std::io::Result<()>)
     997      3349884 : where
     998      3349884 :     Buf: IoBufMut + Send,
     999      3349884 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
    1000      3349884 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
    1001      3349884 : {
    1002      6699780 :     while buf.bytes_total() != 0 {
    1003              :         let res;
    1004      3349908 :         (buf, res) = read_at(buf, offset).await;
    1005            0 :         match res {
    1006           12 :             Ok(0) => break,
    1007      3349896 :             Ok(n) => {
    1008      3349896 :                 buf = buf.slice(n..);
    1009      3349896 :                 offset += n as u64;
    1010      3349896 :             }
    1011            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1012            0 :             Err(e) => return (buf.into_inner(), Err(e)),
    1013              :         }
    1014              :     }
    1015              :     // NB: don't use `buf.is_empty()` here; it is from the
    1016              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
    1017              :     // returned by it only covers the initialized portion of `buf`.
    1018              :     // Whereas we're interested in ensuring that we filled the entire
    1019              :     // buffer that the user passed in.
    1020      3349884 :     if buf.bytes_total() != 0 {
    1021           12 :         (
    1022           12 :             buf.into_inner(),
    1023           12 :             Err(std::io::Error::new(
    1024           12 :                 std::io::ErrorKind::UnexpectedEof,
    1025           12 :                 "failed to fill whole buffer",
    1026           12 :             )),
    1027           12 :         )
    1028              :     } else {
    1029      3349872 :         assert_eq!(buf.len(), buf.bytes_total());
    1030      3349872 :         (buf.into_inner(), Ok(()))
    1031              :     }
    1032      3349884 : }
    1033              : 
    1034              : #[cfg(test)]
    1035              : mod test_read_exact_at_impl {
    1036              : 
    1037              :     use std::collections::VecDeque;
    1038              :     use std::sync::Arc;
    1039              : 
    1040              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
    1041              : 
    1042              :     use super::read_exact_at_impl;
    1043              : 
    1044              :     struct Expectation {
    1045              :         offset: u64,
    1046              :         bytes_total: usize,
    1047              :         result: std::io::Result<Vec<u8>>,
    1048              :     }
    1049              :     struct MockReadAt {
    1050              :         expectations: VecDeque<Expectation>,
    1051              :     }
    1052              : 
    1053              :     impl MockReadAt {
    1054           72 :         async fn read_at(
    1055           72 :             &mut self,
    1056           72 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
    1057           72 :             offset: u64,
    1058           72 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
    1059           72 :             let exp = self
    1060           72 :                 .expectations
    1061           72 :                 .pop_front()
    1062           72 :                 .expect("read_at called but we have no expectations left");
    1063           72 :             assert_eq!(exp.offset, offset);
    1064           72 :             assert_eq!(exp.bytes_total, buf.bytes_total());
    1065           72 :             match exp.result {
    1066           72 :                 Ok(bytes) => {
    1067           72 :                     assert!(bytes.len() <= buf.bytes_total());
    1068           72 :                     buf.put_slice(&bytes);
    1069           72 :                     (buf, Ok(bytes.len()))
    1070              :                 }
    1071            0 :                 Err(e) => (buf, Err(e)),
    1072              :             }
    1073           72 :         }
    1074              :     }
    1075              : 
    1076              :     impl Drop for MockReadAt {
    1077           48 :         fn drop(&mut self) {
    1078           48 :             assert_eq!(self.expectations.len(), 0);
    1079           48 :         }
    1080              :     }
    1081              : 
    1082              :     #[tokio::test]
    1083           12 :     async fn test_basic() {
    1084           12 :         let buf = Vec::with_capacity(5).slice_full();
    1085           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1086           12 :             expectations: VecDeque::from(vec![Expectation {
    1087           12 :                 offset: 0,
    1088           12 :                 bytes_total: 5,
    1089           12 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
    1090           12 :             }]),
    1091           12 :         }));
    1092           12 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1093           12 :             let mock_read_at = Arc::clone(&mock_read_at);
    1094           12 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1095           12 :         })
    1096           12 :         .await;
    1097           12 :         assert!(res.is_ok());
    1098           12 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
    1099           12 :     }
    1100              : 
    1101              :     #[tokio::test]
    1102           12 :     async fn test_empty_buf_issues_no_syscall() {
    1103           12 :         let buf = Vec::new().slice_full();
    1104           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1105           12 :             expectations: VecDeque::new(),
    1106           12 :         }));
    1107           12 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1108            0 :             let mock_read_at = Arc::clone(&mock_read_at);
    1109           12 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1110           12 :         })
    1111           12 :         .await;
    1112           12 :         assert!(res.is_ok());
    1113           12 :     }
    1114              : 
    1115              :     #[tokio::test]
    1116           12 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
    1117           12 :         let buf = Vec::with_capacity(4).slice_full();
    1118           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1119           12 :             expectations: VecDeque::from(vec![
    1120           12 :                 Expectation {
    1121           12 :                     offset: 0,
    1122           12 :                     bytes_total: 4,
    1123           12 :                     result: Ok(vec![b'a', b'b']),
    1124           12 :                 },
    1125           12 :                 Expectation {
    1126           12 :                     offset: 2,
    1127           12 :                     bytes_total: 2,
    1128           12 :                     result: Ok(vec![b'c', b'd']),
    1129           12 :                 },
    1130           12 :             ]),
    1131           12 :         }));
    1132           24 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1133           24 :             let mock_read_at = Arc::clone(&mock_read_at);
    1134           24 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1135           24 :         })
    1136           12 :         .await;
    1137           12 :         assert!(res.is_ok());
    1138           12 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
    1139           12 :     }
    1140              : 
    1141              :     #[tokio::test]
    1142           12 :     async fn test_eof_before_buffer_full() {
    1143           12 :         let buf = Vec::with_capacity(3).slice_full();
    1144           12 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1145           12 :             expectations: VecDeque::from(vec![
    1146           12 :                 Expectation {
    1147           12 :                     offset: 0,
    1148           12 :                     bytes_total: 3,
    1149           12 :                     result: Ok(vec![b'a']),
    1150           12 :                 },
    1151           12 :                 Expectation {
    1152           12 :                     offset: 1,
    1153           12 :                     bytes_total: 2,
    1154           12 :                     result: Ok(vec![b'b']),
    1155           12 :                 },
    1156           12 :                 Expectation {
    1157           12 :                     offset: 2,
    1158           12 :                     bytes_total: 1,
    1159           12 :                     result: Ok(vec![]),
    1160           12 :                 },
    1161           12 :             ]),
    1162           12 :         }));
    1163           36 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1164           36 :             let mock_read_at = Arc::clone(&mock_read_at);
    1165           36 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1166           36 :         })
    1167           12 :         .await;
    1168           12 :         let Err(err) = res else {
    1169           12 :             panic!("should return an error");
    1170           12 :         };
    1171           12 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
    1172           12 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
    1173           12 :         // buffer contents on error are unspecified
    1174           12 :     }
    1175              : }
    1176              : 
    1177              : struct FileGuard {
    1178              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
    1179              : }
    1180              : 
    1181              : impl AsRef<OwnedFd> for FileGuard {
    1182      4209203 :     fn as_ref(&self) -> &OwnedFd {
    1183      4209203 :         // This unwrap is safe because we only create `FileGuard`s
    1184      4209203 :         // if we know that the file is Some.
    1185      4209203 :         self.slot_guard.file.as_ref().unwrap()
    1186      4209203 :     }
    1187              : }
    1188              : 
    1189              : impl FileGuard {
    1190              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1191      2104256 :     fn with_std_file<F, R>(&self, with: F) -> R
    1192      2104256 :     where
    1193      2104256 :         F: FnOnce(&File) -> R,
    1194      2104256 :     {
    1195      2104256 :         // SAFETY:
    1196      2104256 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1197      2104256 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
    1198      2104256 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1199      2104256 :         let res = with(&file);
    1200      2104256 :         let _ = file.into_raw_fd();
    1201      2104256 :         res
    1202      2104256 :     }
    1203              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1204           24 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
    1205           24 :     where
    1206           24 :         F: FnOnce(&mut File) -> R,
    1207           24 :     {
    1208           24 :         // SAFETY:
    1209           24 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1210           24 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
    1211           24 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1212           24 :         let res = with(&mut file);
    1213           24 :         let _ = file.into_raw_fd();
    1214           24 :         res
    1215           24 :     }
    1216              : }
    1217              : 
    1218              : impl tokio_epoll_uring::IoFd for FileGuard {
    1219      2104923 :     unsafe fn as_fd(&self) -> RawFd {
    1220      2104923 :         let owned_fd: &OwnedFd = self.as_ref();
    1221      2104923 :         owned_fd.as_raw_fd()
    1222      2104923 :     }
    1223              : }
    1224              : 
    1225              : #[cfg(test)]
    1226              : impl VirtualFile {
    1227       125496 :     pub(crate) async fn read_blk(
    1228       125496 :         &self,
    1229       125496 :         blknum: u32,
    1230       125496 :         ctx: &RequestContext,
    1231       125496 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1232       125496 :         self.inner.read_blk(blknum, ctx).await
    1233       125496 :     }
    1234              : }
    1235              : 
    1236              : #[cfg(test)]
    1237              : impl VirtualFileInner {
    1238       125496 :     pub(crate) async fn read_blk(
    1239       125496 :         &self,
    1240       125496 :         blknum: u32,
    1241       125496 :         ctx: &RequestContext,
    1242       125496 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1243              :         use crate::page_cache::PAGE_SZ;
    1244       125496 :         let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
    1245       125496 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1246       125496 :         let slice = self
    1247       125496 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1248       125496 :             .await?;
    1249       125496 :         Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
    1250       125496 :             slice.into_inner(),
    1251       125496 :         ))
    1252       125496 :     }
    1253              : }
    1254              : 
    1255              : impl Drop for VirtualFileInner {
    1256              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1257        32095 :     fn drop(&mut self) {
    1258        32095 :         let handle = self.handle.get_mut();
    1259              : 
    1260        32095 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1261        32095 :             if slot_guard.tag == tag {
    1262        28639 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1263              :                 // there is also operation "close-by-replace" for closes done on eviction for
    1264              :                 // comparison.
    1265        28639 :                 if let Some(fd) = slot_guard.file.take() {
    1266        28639 :                     STORAGE_IO_TIME_METRIC
    1267        28639 :                         .get(StorageIoOperation::Close)
    1268        28639 :                         .observe_closure_duration(|| drop(fd));
    1269        28639 :                 }
    1270         3456 :             }
    1271        32095 :         }
    1272              : 
    1273              :         // We don't have async drop so we cannot directly await the lock here.
    1274              :         // Instead, first do a best-effort attempt at closing the underlying
    1275              :         // file descriptor by using `try_write`, and if that fails, spawn
    1276              :         // a tokio task to do it asynchronously: we just want it to be
    1277              :         // cleaned up eventually.
    1278              :         // Most of the time, the `try_lock` should succeed though,
    1279              :         // as we have `&mut self` access. In other words, if the slot
    1280              :         // is still occupied by our file, there should be no access from
    1281              :         // other I/O operations; the only other possible place to lock
    1282              :         // the slot is the lock algorithm looking for free slots.
    1283        32095 :         let slot = &get_open_files().slots[handle.index];
    1284        32095 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1285        32095 :             clean_slot(slot, slot_guard, handle.tag);
    1286        32095 :         } else {
    1287            0 :             let tag = handle.tag;
    1288            0 :             tokio::spawn(async move {
    1289            0 :                 let slot_guard = slot.inner.write().await;
    1290            0 :                 clean_slot(slot, slot_guard, tag);
    1291            0 :             });
    1292            0 :         };
    1293        32095 :     }
    1294              : }
    1295              : 
    1296              : impl OwnedAsyncWriter for VirtualFile {
    1297        39882 :     async fn write_all_at<Buf: IoBufAligned + Send>(
    1298        39882 :         &self,
    1299        39882 :         buf: FullSlice<Buf>,
    1300        39882 :         offset: u64,
    1301        39882 :         ctx: &RequestContext,
    1302        39882 :     ) -> (FullSlice<Buf>, std::io::Result<()>) {
    1303        39882 :         VirtualFile::write_all_at(self, buf, offset, ctx).await
    1304        39882 :     }
    1305              : }
    1306              : 
    1307              : impl OpenFiles {
    1308         1452 :     fn new(num_slots: usize) -> OpenFiles {
    1309         1452 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1310        14520 :         for _ in 0..num_slots {
    1311        14520 :             let slot = Slot {
    1312        14520 :                 recently_used: AtomicBool::new(false),
    1313        14520 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1314        14520 :             };
    1315        14520 :             slots.push(slot);
    1316        14520 :         }
    1317              : 
    1318         1452 :         OpenFiles {
    1319         1452 :             next: AtomicUsize::new(0),
    1320         1452 :             slots: Box::leak(slots),
    1321         1452 :         }
    1322         1452 :     }
    1323              : }
    1324              : 
    1325              : ///
    1326              : /// Initialize the virtual file module. This must be called once at page
    1327              : /// server startup.
    1328              : ///
    1329              : #[cfg(not(test))]
    1330            0 : pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode, sync_mode: SyncMode) {
    1331            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1332            0 :         panic!("virtual_file::init called twice");
    1333            0 :     }
    1334            0 :     set_io_mode(mode);
    1335            0 :     io_engine::init(engine);
    1336            0 :     SYNC_MODE.store(sync_mode as u8, std::sync::atomic::Ordering::Relaxed);
    1337            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1338            0 : }
    1339              : 
    1340              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1341              : 
    1342              : // Get a handle to the global slots array.
    1343      4278420 : fn get_open_files() -> &'static OpenFiles {
    1344      4278420 :     //
    1345      4278420 :     // In unit tests, page server startup doesn't happen and no one calls
    1346      4278420 :     // virtual_file::init(). Initialize it here, with a small array.
    1347      4278420 :     //
    1348      4278420 :     // This applies to the virtual file tests below, but all other unit
    1349      4278420 :     // tests too, so the virtual file facility is always usable in
    1350      4278420 :     // unit tests.
    1351      4278420 :     //
    1352      4278420 :     if cfg!(test) {
    1353      4278420 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1354              :     } else {
    1355            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1356              :     }
    1357      4278420 : }
    1358              : 
    1359              : /// Gets the io buffer alignment.
    1360            0 : pub(crate) const fn get_io_buffer_alignment() -> usize {
    1361            0 :     DEFAULT_IO_BUFFER_ALIGNMENT
    1362            0 : }
    1363              : 
    1364              : pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
    1365              : pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
    1366              : pub(crate) type IoPageSlice<'a> =
    1367              :     AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
    1368              : 
    1369              : static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
    1370         1308 :     once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
    1371              : 
    1372            0 : pub fn set_io_mode(mode: IoMode) {
    1373            0 :     IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
    1374            0 : }
    1375              : 
    1376        15468 : pub(crate) fn get_io_mode() -> IoMode {
    1377        15468 :     IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
    1378        15468 : }
    1379              : 
    1380              : static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
    1381              : 
    1382              : #[cfg(test)]
    1383              : mod tests {
    1384              :     use std::io::Write;
    1385              :     use std::os::unix::fs::FileExt;
    1386              :     use std::sync::Arc;
    1387              : 
    1388              :     use owned_buffers_io::io_buf_ext::IoBufExt;
    1389              :     use owned_buffers_io::slice::SliceMutExt;
    1390              :     use rand::seq::SliceRandom;
    1391              :     use rand::{Rng, thread_rng};
    1392              : 
    1393              :     use super::*;
    1394              :     use crate::context::DownloadBehavior;
    1395              :     use crate::task_mgr::TaskKind;
    1396              : 
    1397              :     enum MaybeVirtualFile {
    1398              :         VirtualFile(VirtualFile),
    1399              :         File(File),
    1400              :     }
    1401              : 
    1402              :     impl From<VirtualFile> for MaybeVirtualFile {
    1403           36 :         fn from(vf: VirtualFile) -> Self {
    1404           36 :             MaybeVirtualFile::VirtualFile(vf)
    1405           36 :         }
    1406              :     }
    1407              : 
    1408              :     impl MaybeVirtualFile {
    1409         2424 :         async fn read_exact_at(
    1410         2424 :             &self,
    1411         2424 :             mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
    1412         2424 :             offset: u64,
    1413         2424 :             ctx: &RequestContext,
    1414         2424 :         ) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
    1415         2424 :             match self {
    1416         1212 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
    1417         1212 :                 MaybeVirtualFile::File(file) => {
    1418         1212 :                     let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
    1419         1212 :                     file.read_exact_at(rust_slice, offset).map(|()| slice)
    1420              :                 }
    1421              :             }
    1422         2424 :         }
    1423           48 :         async fn write_all_at<Buf: IoBufAligned + Send>(
    1424           48 :             &self,
    1425           48 :             buf: FullSlice<Buf>,
    1426           48 :             offset: u64,
    1427           48 :             ctx: &RequestContext,
    1428           48 :         ) -> Result<(), Error> {
    1429           48 :             match self {
    1430           24 :                 MaybeVirtualFile::VirtualFile(file) => {
    1431           24 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1432           24 :                     res
    1433              :                 }
    1434           24 :                 MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
    1435              :             }
    1436           48 :         }
    1437          216 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1438          216 :             match self {
    1439          108 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1440          108 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1441              :             }
    1442          216 :         }
    1443           48 :         async fn write_all<Buf: IoBuf + Send>(
    1444           48 :             &mut self,
    1445           48 :             buf: FullSlice<Buf>,
    1446           48 :             ctx: &RequestContext,
    1447           48 :         ) -> Result<(), Error> {
    1448           48 :             match self {
    1449           24 :                 MaybeVirtualFile::VirtualFile(file) => {
    1450           24 :                     let (_buf, res) = file.write_all(buf, ctx).await;
    1451           24 :                     res.map(|_| ())
    1452              :                 }
    1453           24 :                 MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
    1454              :             }
    1455           48 :         }
    1456              : 
    1457              :         // Helper function to slurp contents of a file, starting at the current position,
    1458              :         // into a string
    1459         2652 :         async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
    1460              :             use std::io::Read;
    1461         2652 :             let mut buf = String::new();
    1462         2652 :             match self {
    1463         1344 :                 MaybeVirtualFile::VirtualFile(file) => {
    1464         1344 :                     let mut buf = Vec::new();
    1465         1344 :                     file.read_to_end(&mut buf, ctx).await?;
    1466         1332 :                     return Ok(String::from_utf8(buf).unwrap());
    1467              :                 }
    1468         1308 :                 MaybeVirtualFile::File(file) => {
    1469         1308 :                     file.read_to_string(&mut buf)?;
    1470              :                 }
    1471              :             }
    1472         1296 :             Ok(buf)
    1473         2652 :         }
    1474              : 
    1475              :         // Helper function to slurp a portion of a file into a string
    1476         2424 :         async fn read_string_at(
    1477         2424 :             &mut self,
    1478         2424 :             pos: u64,
    1479         2424 :             len: usize,
    1480         2424 :             ctx: &RequestContext,
    1481         2424 :         ) -> Result<String, Error> {
    1482         2424 :             let slice = IoBufferMut::with_capacity(len).slice_full();
    1483         2424 :             assert_eq!(slice.bytes_total(), len);
    1484         2424 :             let slice = self.read_exact_at(slice, pos, ctx).await?;
    1485         2424 :             let buf = slice.into_inner();
    1486         2424 :             assert_eq!(buf.len(), len);
    1487              : 
    1488         2424 :             Ok(String::from_utf8(buf.to_vec()).unwrap())
    1489         2424 :         }
    1490              :     }
    1491              : 
    1492              :     #[tokio::test]
    1493           12 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1494           12 :         // The real work is done in the test_files() helper function. This
    1495           12 :         // allows us to run the same set of tests against a native File, and
    1496           12 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1497           12 :         // but this allows us to verify that the operations return the same
    1498           12 :         // results with VirtualFiles as with native Files. (Except that with
    1499           12 :         // native files, you will run out of file descriptors if the ulimit
    1500           12 :         // is low enough.)
    1501           12 :         struct A;
    1502           12 : 
    1503           12 :         impl Adapter for A {
    1504         1236 :             async fn open(
    1505         1236 :                 path: Utf8PathBuf,
    1506         1236 :                 opts: OpenOptions,
    1507         1236 :                 ctx: &RequestContext,
    1508         1236 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1509         1236 :                 let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
    1510         1236 :                 Ok(MaybeVirtualFile::VirtualFile(vf))
    1511         1236 :             }
    1512           12 :         }
    1513           12 :         test_files::<A>("virtual_files").await
    1514           12 :     }
    1515              : 
    1516              :     #[tokio::test]
    1517           12 :     async fn test_physical_files() -> anyhow::Result<()> {
    1518           12 :         struct B;
    1519           12 : 
    1520           12 :         impl Adapter for B {
    1521         1236 :             async fn open(
    1522         1236 :                 path: Utf8PathBuf,
    1523         1236 :                 opts: OpenOptions,
    1524         1236 :                 _ctx: &RequestContext,
    1525         1236 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1526           12 :                 Ok(MaybeVirtualFile::File({
    1527         1236 :                     let owned_fd = opts.open(path.as_std_path()).await?;
    1528         1236 :                     File::from(owned_fd)
    1529           12 :                 }))
    1530         1236 :             }
    1531           12 :         }
    1532           12 : 
    1533           12 :         test_files::<B>("physical_files").await
    1534           12 :     }
    1535              : 
    1536              :     /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
    1537              :     /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
    1538              :     /// in trait which benefits from the new lifetime capture rules already.
    1539              :     trait Adapter {
    1540              :         async fn open(
    1541              :             path: Utf8PathBuf,
    1542              :             opts: OpenOptions,
    1543              :             ctx: &RequestContext,
    1544              :         ) -> Result<MaybeVirtualFile, anyhow::Error>;
    1545              :     }
    1546              : 
    1547           24 :     async fn test_files<A>(testname: &str) -> anyhow::Result<()>
    1548           24 :     where
    1549           24 :         A: Adapter,
    1550           24 :     {
    1551           24 :         let ctx =
    1552           24 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1553           24 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1554           24 :         std::fs::create_dir_all(&testdir)?;
    1555              : 
    1556           24 :         let path_a = testdir.join("file_a");
    1557           24 :         let mut file_a = A::open(
    1558           24 :             path_a.clone(),
    1559           24 :             OpenOptions::new()
    1560           24 :                 .write(true)
    1561           24 :                 .create(true)
    1562           24 :                 .truncate(true)
    1563           24 :                 .to_owned(),
    1564           24 :             &ctx,
    1565           24 :         )
    1566           24 :         .await?;
    1567              : 
    1568           24 :         file_a
    1569           24 :             .write_all(b"foobar".to_vec().slice_len(), &ctx)
    1570           24 :             .await?;
    1571              : 
    1572              :         // cannot read from a file opened in write-only mode
    1573           24 :         let _ = file_a.read_string(&ctx).await.unwrap_err();
    1574              : 
    1575              :         // Close the file and re-open for reading
    1576           24 :         let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
    1577              : 
    1578              :         // cannot write to a file opened in read-only mode
    1579           24 :         let _ = file_a
    1580           24 :             .write_all(b"bar".to_vec().slice_len(), &ctx)
    1581           24 :             .await
    1582           24 :             .unwrap_err();
    1583           24 : 
    1584           24 :         // Try simple read
    1585           24 :         assert_eq!("foobar", file_a.read_string(&ctx).await?);
    1586              : 
    1587              :         // It's positioned at the EOF now.
    1588           24 :         assert_eq!("", file_a.read_string(&ctx).await?);
    1589              : 
    1590              :         // Test seeks.
    1591           24 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1592           24 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1593              : 
    1594           24 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1595           24 :         assert_eq!("ar", file_a.read_string(&ctx).await?);
    1596              : 
    1597           24 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1598           24 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1599           24 :         assert_eq!("bar", file_a.read_string(&ctx).await?);
    1600              : 
    1601           24 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1602           24 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1603              : 
    1604              :         // Test erroneous seeks to before byte 0
    1605           24 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1606           24 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1607           24 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1608           24 : 
    1609           24 :         // the erroneous seek should have left the position unchanged
    1610           24 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1611              : 
    1612              :         // Create another test file, and try FileExt functions on it.
    1613           24 :         let path_b = testdir.join("file_b");
    1614           24 :         let mut file_b = A::open(
    1615           24 :             path_b.clone(),
    1616           24 :             OpenOptions::new()
    1617           24 :                 .read(true)
    1618           24 :                 .write(true)
    1619           24 :                 .create(true)
    1620           24 :                 .truncate(true)
    1621           24 :                 .to_owned(),
    1622           24 :             &ctx,
    1623           24 :         )
    1624           24 :         .await?;
    1625           24 :         file_b
    1626           24 :             .write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
    1627           24 :             .await?;
    1628           24 :         file_b
    1629           24 :             .write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
    1630           24 :             .await?;
    1631              : 
    1632           24 :         assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
    1633              : 
    1634              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1635              :         // open the same file many times. The effect is the same.)
    1636              :         //
    1637              :         // leave file_a positioned at offset 1 before we start
    1638           24 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1639              : 
    1640           24 :         let mut vfiles = Vec::new();
    1641         2424 :         for _ in 0..100 {
    1642         2400 :             let mut vfile = A::open(
    1643         2400 :                 path_b.clone(),
    1644         2400 :                 OpenOptions::new().read(true).to_owned(),
    1645         2400 :                 &ctx,
    1646         2400 :             )
    1647         2400 :             .await?;
    1648         2400 :             assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
    1649         2400 :             vfiles.push(vfile);
    1650              :         }
    1651              : 
    1652              :         // make sure we opened enough files to definitely cause evictions.
    1653           24 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1654              : 
    1655              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1656              :         // from it again. We left the file positioned at offset 1 above.
    1657           24 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1658              : 
    1659              :         // Check that all the other FDs still work too. Use them in random order for
    1660              :         // good measure.
    1661           24 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1662         2400 :         for vfile in vfiles.iter_mut() {
    1663         2400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
    1664              :         }
    1665              : 
    1666           24 :         Ok(())
    1667           24 :     }
    1668              : 
    1669              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1670              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1671              :     /// VirtualFile from multiple threads concurrently.
    1672              :     #[tokio::test]
    1673           12 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1674           12 :         const SIZE: usize = 8 * 1024;
    1675           12 :         const VIRTUAL_FILES: usize = 100;
    1676           12 :         const THREADS: usize = 100;
    1677           12 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1678           12 : 
    1679           12 :         let ctx =
    1680           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1681           12 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1682           12 :         std::fs::create_dir_all(&testdir)?;
    1683           12 : 
    1684           12 :         // Create a test file.
    1685           12 :         let test_file_path = testdir.join("concurrency_test_file");
    1686           12 :         {
    1687           12 :             let file = File::create(&test_file_path)?;
    1688           12 :             file.write_all_at(&SAMPLE, 0)?;
    1689           12 :         }
    1690           12 : 
    1691           12 :         // Open the file many times.
    1692           12 :         let mut files = Vec::new();
    1693         1212 :         for _ in 0..VIRTUAL_FILES {
    1694         1200 :             let f = VirtualFileInner::open_with_options(
    1695         1200 :                 &test_file_path,
    1696         1200 :                 OpenOptions::new().read(true),
    1697         1200 :                 &ctx,
    1698         1200 :             )
    1699         1200 :             .await?;
    1700         1200 :             files.push(f);
    1701           12 :         }
    1702           12 :         let files = Arc::new(files);
    1703           12 : 
    1704           12 :         // Launch many threads, and use the virtual files concurrently in random order.
    1705           12 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1706           12 :             .worker_threads(THREADS)
    1707           12 :             .thread_name("test_vfile_concurrency thread")
    1708           12 :             .build()
    1709           12 :             .unwrap();
    1710           12 :         let mut hdls = Vec::new();
    1711         1212 :         for _threadno in 0..THREADS {
    1712         1200 :             let files = files.clone();
    1713         1200 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1714         1200 :             let hdl = rt.spawn(async move {
    1715         1200 :                 let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
    1716         1200 :                 let mut rng = rand::rngs::OsRng;
    1717      1200000 :                 for _ in 1..1000 {
    1718      1198800 :                     let f = &files[rng.gen_range(0..files.len())];
    1719      1198800 :                     buf = f
    1720      1198800 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1721      1198800 :                         .await
    1722      1198800 :                         .unwrap()
    1723      1198800 :                         .into_inner();
    1724      1198800 :                     assert!(buf[..] == SAMPLE);
    1725           12 :                 }
    1726         1200 :             });
    1727         1200 :             hdls.push(hdl);
    1728         1200 :         }
    1729         1212 :         for hdl in hdls {
    1730         1200 :             hdl.await?;
    1731           12 :         }
    1732           12 :         std::mem::forget(rt);
    1733           12 : 
    1734           12 :         Ok(())
    1735           12 :     }
    1736              : 
    1737              :     #[tokio::test]
    1738           12 :     async fn test_atomic_overwrite_basic() {
    1739           12 :         let ctx =
    1740           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1741           12 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1742           12 :         std::fs::create_dir_all(&testdir).unwrap();
    1743           12 : 
    1744           12 :         let path = testdir.join("myfile");
    1745           12 :         let tmp_path = testdir.join("myfile.tmp");
    1746           12 : 
    1747           12 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1748           12 :             .await
    1749           12 :             .unwrap();
    1750           12 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1751           12 :         let post = file.read_string(&ctx).await.unwrap();
    1752           12 :         assert_eq!(post, "foo");
    1753           12 :         assert!(!tmp_path.exists());
    1754           12 :         drop(file);
    1755           12 : 
    1756           12 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1757           12 :             .await
    1758           12 :             .unwrap();
    1759           12 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1760           12 :         let post = file.read_string(&ctx).await.unwrap();
    1761           12 :         assert_eq!(post, "bar");
    1762           12 :         assert!(!tmp_path.exists());
    1763           12 :         drop(file);
    1764           12 :     }
    1765              : 
    1766              :     #[tokio::test]
    1767           12 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1768           12 :         let ctx =
    1769           12 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1770           12 :         let testdir =
    1771           12 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1772           12 :         std::fs::create_dir_all(&testdir).unwrap();
    1773           12 : 
    1774           12 :         let path = testdir.join("myfile");
    1775           12 :         let tmp_path = testdir.join("myfile.tmp");
    1776           12 : 
    1777           12 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1778           12 :         assert!(tmp_path.exists());
    1779           12 : 
    1780           12 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1781           12 :             .await
    1782           12 :             .unwrap();
    1783           12 : 
    1784           12 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1785           12 :         let post = file.read_string(&ctx).await.unwrap();
    1786           12 :         assert_eq!(post, "foo");
    1787           12 :         assert!(!tmp_path.exists());
    1788           12 :         drop(file);
    1789           12 :     }
    1790              : }
        

Generated by: LCOV version 2.1-beta