LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 86.1 % 798 687
Test Date: 2025-07-16 12:29:03 Functions: 76.1 % 188 143

            Line data    Source code
       1              : //! VirtualFile is like a normal File, but it's not bound directly to
       2              : //! a file descriptor.
       3              : //!
       4              : //! Instead, the file is opened when it's read from,
       5              : //! and if too many files are open globally in the system, least-recently
       6              : //! used ones are closed.
       7              : //!
       8              : //! To track which files have been recently used, we use the clock algorithm
       9              : //! with a 'recently_used' flag on each slot.
      10              : //!
      11              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      12              : //! src/backend/storage/file/fd.c
      13              : //!
      14              : use std::fs::File;
      15              : use std::io::{Error, ErrorKind};
      16              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      17              : use std::sync::LazyLock;
      18              : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
      19              : 
      20              : use camino::{Utf8Path, Utf8PathBuf};
      21              : use once_cell::sync::OnceCell;
      22              : use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
      23              : use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
      24              : use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
      25              : use owned_buffers_io::io_buf_ext::FullSlice;
      26              : use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
      27              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      28              : use tokio::time::Instant;
      29              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      30              : 
      31              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      32              : use crate::assert_u64_eq_usize::UsizeIsU64;
      33              : use crate::context::RequestContext;
      34              : use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
      35              : use crate::page_cache::{PAGE_SZ, PageWriteGuard};
      36              : 
      37              : pub(crate) use api::IoMode;
      38              : pub(crate) use io_engine::IoEngineKind;
      39              : pub use io_engine::{
      40              :     FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
      41              :     io_engine_for_bench,
      42              : };
      43              : pub(crate) use metadata::Metadata;
      44              : pub(crate) use open_options::*;
      45              : pub use pageserver_api::models::virtual_file as api;
      46              : pub use temporary::TempVirtualFile;
      47              : 
      48              : pub(crate) mod io_engine;
      49              : mod metadata;
      50              : mod open_options;
      51              : mod temporary;
      52              : pub(crate) mod owned_buffers_io {
      53              :     //! Abstractions for IO with owned buffers.
      54              :     //!
      55              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      56              :     //! reason we need this abstraction.
      57              :     //!
      58              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      59              :     //! but for the time being we're proving out the primitives in the neon.git repo
      60              :     //! for faster iteration.
      61              : 
      62              :     pub(crate) mod aligned_buffer;
      63              :     pub(crate) mod io_buf_aligned;
      64              :     pub(crate) mod io_buf_ext;
      65              :     pub(crate) mod slice;
      66              :     pub(crate) mod write;
      67              : }
      68              : 
      69              : #[derive(Debug)]
      70              : pub struct VirtualFile {
      71              :     inner: VirtualFileInner,
      72              :     _mode: IoMode,
      73              : }
      74              : 
      75              : impl VirtualFile {
      76              :     /// Open a file in read-only mode. Like File::open.
      77              :     ///
      78              :     /// Insensitive to `virtual_file_io_mode` setting.
      79          516 :     pub async fn open<P: AsRef<Utf8Path>>(
      80          516 :         path: P,
      81          516 :         ctx: &RequestContext,
      82          516 :     ) -> Result<Self, std::io::Error> {
      83          516 :         let inner = VirtualFileInner::open(path, ctx).await?;
      84          516 :         Ok(VirtualFile {
      85          516 :             inner,
      86          516 :             _mode: IoMode::Buffered,
      87          516 :         })
      88          516 :     }
      89              : 
      90              :     /// Open a file in read-only mode. Like File::open.
      91              :     ///
      92              :     /// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
      93          647 :     pub async fn open_v2<P: AsRef<Utf8Path>>(
      94          647 :         path: P,
      95          647 :         ctx: &RequestContext,
      96          647 :     ) -> Result<Self, std::io::Error> {
      97          647 :         Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
      98          647 :     }
      99              : 
     100              :     /// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
     101         2654 :     pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
     102         2654 :         path: P,
     103         2654 :         mut open_options: OpenOptions,
     104         2654 :         ctx: &RequestContext,
     105         2654 :     ) -> Result<Self, std::io::Error> {
     106         2654 :         let mode = get_io_mode();
     107         2654 :         let direct = match (mode, open_options.is_write()) {
     108            0 :             (IoMode::Buffered, _) => false,
     109            0 :             (IoMode::Direct, false) => true,
     110            0 :             (IoMode::Direct, true) => false,
     111         2654 :             (IoMode::DirectRw, _) => true,
     112              :         };
     113         2654 :         open_options = open_options.direct(direct);
     114         2654 :         let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
     115         2654 :         Ok(VirtualFile { inner, _mode: mode })
     116         2654 :     }
     117              : 
     118          806 :     pub fn path(&self) -> &Utf8Path {
     119          806 :         self.inner.path.as_path()
     120          806 :     }
     121              : 
     122           11 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     123           11 :         final_path: Utf8PathBuf,
     124           11 :         tmp_path: Utf8PathBuf,
     125           11 :         content: B,
     126           11 :     ) -> std::io::Result<()> {
     127           11 :         VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
     128           11 :     }
     129              : 
     130         1452 :     pub async fn sync_all(&self) -> Result<(), Error> {
     131         1452 :         if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
     132            0 :             return Ok(());
     133         1452 :         }
     134         1452 :         self.inner.sync_all().await
     135         1452 :     }
     136              : 
     137            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     138            0 :         if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
     139            0 :             return Ok(());
     140            0 :         }
     141            0 :         self.inner.sync_data().await
     142            0 :     }
     143              : 
     144            7 :     pub async fn set_len(&self, len: u64, ctx: &RequestContext) -> Result<(), Error> {
     145            7 :         self.inner.set_len(len, ctx).await
     146            7 :     }
     147              : 
     148          930 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     149          930 :         self.inner.metadata().await
     150          930 :     }
     151              : 
     152       256034 :     pub async fn read_exact_at<Buf>(
     153       256034 :         &self,
     154       256034 :         slice: Slice<Buf>,
     155       256034 :         offset: u64,
     156       256034 :         ctx: &RequestContext,
     157       256034 :     ) -> Result<Slice<Buf>, Error>
     158       256034 :     where
     159       256034 :         Buf: IoBufAlignedMut + Send,
     160       256034 :     {
     161       256034 :         self.inner.read_exact_at(slice, offset, ctx).await
     162       256034 :     }
     163              : 
     164        17319 :     pub async fn read_exact_at_page(
     165        17319 :         &self,
     166        17319 :         page: PageWriteGuard<'static>,
     167        17319 :         offset: u64,
     168        17319 :         ctx: &RequestContext,
     169        17319 :     ) -> Result<PageWriteGuard<'static>, Error> {
     170        17319 :         self.inner.read_exact_at_page(page, offset, ctx).await
     171        17319 :     }
     172              : 
     173        19808 :     pub async fn write_all_at<Buf: IoBufAligned + Send>(
     174        19808 :         &self,
     175        19808 :         buf: FullSlice<Buf>,
     176        19808 :         offset: u64,
     177        19808 :         ctx: &RequestContext,
     178        19808 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     179        19808 :         self.inner.write_all_at(buf, offset, ctx).await
     180        19808 :     }
     181              : 
     182            0 :     pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
     183            0 :         path: P,
     184            0 :         ctx: &RequestContext,
     185            0 :     ) -> std::io::Result<String> {
     186            0 :         let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
     187            0 :         let mut buf = Vec::new();
     188            0 :         let mut tmp = vec![0; 128];
     189            0 :         let mut pos: u64 = 0;
     190              :         loop {
     191            0 :             let slice = tmp.slice(..128);
     192            0 :             let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
     193            0 :             match res {
     194            0 :                 Ok(0) => break,
     195            0 :                 Ok(n) => {
     196            0 :                     pos += n as u64;
     197            0 :                     buf.extend_from_slice(&slice[..n]);
     198            0 :                 }
     199            0 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     200            0 :                 Err(e) => return Err(e),
     201              :             }
     202            0 :             tmp = slice.into_inner();
     203              :         }
     204            0 :         String::from_utf8(buf).map_err(|_| {
     205            0 :             std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
     206            0 :         })
     207            0 :     }
     208              : }
     209              : 
     210              : /// Indicates whether to enable fsync, fdatasync, or O_SYNC/O_DSYNC when writing
     211              : /// files. Switching this off is unsafe and only used for testing on machines
     212              : /// with slow drives.
     213              : #[repr(u8)]
     214              : pub enum SyncMode {
     215              :     Sync,
     216              :     UnsafeNoSync,
     217              : }
     218              : 
     219              : impl TryFrom<u8> for SyncMode {
     220              :     type Error = u8;
     221              : 
     222            0 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
     223            0 :         Ok(match value {
     224            0 :             v if v == (SyncMode::Sync as u8) => SyncMode::Sync,
     225            0 :             v if v == (SyncMode::UnsafeNoSync as u8) => SyncMode::UnsafeNoSync,
     226            0 :             x => return Err(x),
     227              :         })
     228            0 :     }
     229              : }
     230              : 
     231              : ///
     232              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
     233              : /// the underlying file is closed if the system is low on file descriptors,
     234              : /// and re-opened when it's accessed again.
     235              : ///
     236              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
     237              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
     238              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
     239              : /// require a mutable reference, because they modify the "current position".
     240              : ///
     241              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
     242              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
     243              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
     244              : /// 'tag' field is used to detect whether the handle still is valid or not.
     245              : ///
     246              : #[derive(Debug)]
     247              : pub struct VirtualFileInner {
     248              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
     249              :     /// might contain our File, or it may be empty, or it may contain a File that
     250              :     /// belongs to a different VirtualFile.
     251              :     handle: RwLock<SlotHandle>,
     252              : 
     253              :     /// File path and options to use to open it.
     254              :     ///
     255              :     /// Note: this only contains the options needed to re-open it. For example,
     256              :     /// if a new file is created, we only pass the create flag when it's initially
     257              :     /// opened, in the VirtualFile::create() function, and strip the flag before
     258              :     /// storing it here.
     259              :     pub path: Utf8PathBuf,
     260              :     open_options: OpenOptions,
     261              : }
     262              : 
     263              : #[derive(Debug, PartialEq, Clone, Copy)]
     264              : struct SlotHandle {
     265              :     /// Index into OPEN_FILES.slots
     266              :     index: usize,
     267              : 
     268              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     269              :     /// been recycled and no longer contains the FD for this virtual file.
     270              :     tag: u64,
     271              : }
     272              : 
     273              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     274              : /// are currently open. Each slot in the array is protected by a separate lock,
     275              : /// so that different files can be accessed independently. The lock must be held
     276              : /// in write mode to replace the slot with a different file, but a read mode
     277              : /// is enough to operate on the file, whether you're reading or writing to it.
     278              : ///
     279              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     280              : /// the virtual_file::init() function. It must be called exactly once at page
     281              : /// server startup.
     282              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     283              : 
     284              : struct OpenFiles {
     285              :     slots: &'static [Slot],
     286              : 
     287              :     /// clock arm for the clock algorithm
     288              :     next: AtomicUsize,
     289              : }
     290              : 
     291              : struct Slot {
     292              :     inner: RwLock<SlotInner>,
     293              : 
     294              :     /// has this file been used since last clock sweep?
     295              :     recently_used: AtomicBool,
     296              : }
     297              : 
     298              : struct SlotInner {
     299              :     /// Counter that's incremented every time a different file is stored here.
     300              :     /// To avoid the ABA problem.
     301              :     tag: u64,
     302              : 
     303              :     /// the underlying file
     304              :     file: Option<OwnedFd>,
     305              : }
     306              : 
     307              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     308              : struct PageWriteGuardBuf {
     309              :     page: PageWriteGuard<'static>,
     310              : }
     311              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     312              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     313              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     314              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     315              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     316        69276 :     fn stable_ptr(&self) -> *const u8 {
     317        69276 :         self.page.as_ptr()
     318        69276 :     }
     319       121233 :     fn bytes_init(&self) -> usize {
     320       121233 :         self.page.len()
     321       121233 :     }
     322        51957 :     fn bytes_total(&self) -> usize {
     323        51957 :         self.page.len()
     324        51957 :     }
     325              : }
     326              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     327              : // hence it's safe to hand out the `stable_mut_ptr()`.
     328              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     329        17319 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     330        17319 :         self.page.as_mut_ptr()
     331        17319 :     }
     332              : 
     333        17319 :     unsafe fn set_init(&mut self, pos: usize) {
     334              :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     335        17319 :         assert!(pos <= self.page.len());
     336        17319 :     }
     337              : }
     338              : 
     339              : impl OpenFiles {
     340              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     341              :     ///
     342              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     343              :     /// recently_used has been set. It's all ready for reuse.
     344        99022 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     345              :         //
     346              :         // Run the clock algorithm to find a slot to replace.
     347              :         //
     348        99022 :         let num_slots = self.slots.len();
     349        99022 :         let mut retries = 0;
     350              :         let mut slot;
     351              :         let mut slot_guard;
     352              :         let index;
     353              :         loop {
     354      1679852 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     355      1679852 :             slot = &self.slots[next];
     356              : 
     357              :             // If the recently_used flag on this slot is set, continue the clock
     358              :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     359              :             // lock, also continue the clock sweep.
     360              :             //
     361              :             // We only continue in this manner for a while, though. If we loop
     362              :             // through the array twice without finding a victim, just pick the
     363              :             // next slot and wait until we can reuse it. This way, we avoid
     364              :             // spinning in the extreme case that all the slots are busy with an
     365              :             // I/O operation.
     366      1679852 :             if retries < num_slots * 2 {
     367      1603521 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     368      1498576 :                     if let Ok(guard) = slot.inner.try_write() {
     369        22691 :                         slot_guard = guard;
     370        22691 :                         index = next;
     371        22691 :                         break;
     372      1475885 :                     }
     373       104945 :                 }
     374      1580830 :                 retries += 1;
     375              :             } else {
     376        76331 :                 slot_guard = slot.inner.write().await;
     377        76331 :                 index = next;
     378        76331 :                 break;
     379              :             }
     380              :         }
     381              : 
     382              :         //
     383              :         // We now have the victim slot locked. If it was in use previously, close the
     384              :         // old file.
     385              :         //
     386        99022 :         if let Some(old_file) = slot_guard.file.take() {
     387              :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     388              :             // distinguish the two.
     389        96433 :             STORAGE_IO_TIME_METRIC
     390        96433 :                 .get(StorageIoOperation::CloseByReplace)
     391        96433 :                 .observe_closure_duration(|| drop(old_file));
     392         2589 :         }
     393              : 
     394              :         // Prepare the slot for reuse and return it
     395        99022 :         slot_guard.tag += 1;
     396        99022 :         slot.recently_used.store(true, Ordering::Relaxed);
     397        99022 :         (
     398        99022 :             SlotHandle {
     399        99022 :                 index,
     400        99022 :                 tag: slot_guard.tag,
     401        99022 :             },
     402        99022 :             slot_guard,
     403        99022 :         )
     404        99022 :     }
     405              : }
     406              : 
     407              : /// Identify error types that should alwways terminate the process.  Other
     408              : /// error types may be elegible for retry.
     409            1 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     410              :     use nix::errno::Errno::*;
     411            1 :     match e.raw_os_error().map(nix::errno::Errno::from_raw) {
     412              :         Some(EIO) => {
     413              :             // Terminate on EIO because we no longer trust the device to store
     414              :             // data safely, or to uphold persistence guarantees on fsync.
     415            0 :             true
     416              :         }
     417              :         Some(EROFS) => {
     418              :             // Terminate on EROFS because a filesystem is usually remounted
     419              :             // readonly when it has experienced some critical issue, so the same
     420              :             // logic as EIO applies.
     421            0 :             true
     422              :         }
     423              :         Some(EACCES) => {
     424              :             // Terminate on EACCESS because we should always have permissions
     425              :             // for our own data dir: if we don't, then we can't do our job and
     426              :             // need administrative intervention to fix permissions.  Terminating
     427              :             // is the best way to make sure we stop cleanly rather than going
     428              :             // into infinite retry loops, and will make it clear to the outside
     429              :             // world that we need help.
     430            0 :             true
     431              :         }
     432              :         _ => {
     433              :             // Treat all other local file I/O errors are retryable.  This includes:
     434              :             // - ENOSPC: we stay up and wait for eviction to free some space
     435              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     436              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     437            1 :             false
     438              :         }
     439              :     }
     440            1 : }
     441              : 
     442              : /// Call this when the local filesystem gives us an error with an external
     443              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     444              : /// bad storage or bad configuration, and we can't fix that from inside
     445              : /// a running process.
     446            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     447            0 :     let backtrace = std::backtrace::Backtrace::force_capture();
     448            0 :     tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
     449            0 :     std::process::abort();
     450              : }
     451              : 
     452              : pub(crate) trait MaybeFatalIo<T> {
     453              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     454              :     fn fatal_err(self, context: &str) -> T;
     455              : }
     456              : 
     457              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     458              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     459              :     ///
     460              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     461              :     /// not on ENOSPC.
     462       592007 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     463       592007 :         if let Err(e) = &self {
     464            1 :             if is_fatal_io_error(e) {
     465            0 :                 on_fatal_io_error(e, context);
     466            1 :             }
     467       592006 :         }
     468       592007 :         self
     469       592007 :     }
     470              : 
     471              :     /// Terminate the process on any I/O error.
     472              :     ///
     473              :     /// This is appropriate for reads on files that we know exist: they should always work.
     474         1051 :     fn fatal_err(self, context: &str) -> T {
     475         1051 :         match self {
     476         1051 :             Ok(v) => v,
     477            0 :             Err(e) => {
     478            0 :                 on_fatal_io_error(&e, context);
     479              :             }
     480              :         }
     481         1051 :     }
     482              : }
     483              : 
     484              : /// Observe duration for the given storage I/O operation
     485              : ///
     486              : /// Unlike `observe_closure_duration`, this supports async,
     487              : /// where "support" means that we measure wall clock time.
     488              : macro_rules! observe_duration {
     489              :     ($op:expr, $($body:tt)*) => {{
     490              :         let instant = Instant::now();
     491              :         let result = $($body)*;
     492              :         let elapsed = instant.elapsed().as_secs_f64();
     493              :         STORAGE_IO_TIME_METRIC
     494              :             .get($op)
     495              :             .observe(elapsed);
     496              :         result
     497              :     }}
     498              : }
     499              : 
     500              : macro_rules! with_file {
     501              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     502              :         let $ident = $this.lock_file().await?;
     503              :         observe_duration!($op, $($body)*)
     504              :     }};
     505              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     506              :         let mut $ident = $this.lock_file().await?;
     507              :         observe_duration!($op, $($body)*)
     508              :     }};
     509              : }
     510              : 
     511              : impl VirtualFileInner {
     512              :     /// Open a file in read-only mode. Like File::open.
     513          516 :     pub async fn open<P: AsRef<Utf8Path>>(
     514          516 :         path: P,
     515          516 :         ctx: &RequestContext,
     516          516 :     ) -> Result<VirtualFileInner, std::io::Error> {
     517          516 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
     518          516 :     }
     519              : 
     520              :     /// Open a file with given options.
     521              :     ///
     522              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     523              :     /// they will be applied also when the file is subsequently re-opened, not only
     524              :     /// on the first time. Make sure that's sane!
     525         3170 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     526         3170 :         path: P,
     527         3170 :         open_options: OpenOptions,
     528         3170 :         _ctx: &RequestContext,
     529         3170 :     ) -> Result<VirtualFileInner, std::io::Error> {
     530         3170 :         let path = path.as_ref();
     531         3170 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     532              : 
     533              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     534              :         // where our caller doesn't get to use the returned VirtualFile before its
     535              :         // slot gets re-used by someone else.
     536         3170 :         let file = observe_duration!(StorageIoOperation::Open, {
     537         3170 :             open_options.open(path.as_std_path()).await?
     538              :         });
     539              : 
     540              :         // Strip all options other than read and write.
     541              :         //
     542              :         // It would perhaps be nicer to check just for the read and write flags
     543              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     544              :         // only to set them.
     545         3170 :         let reopen_options = open_options
     546         3170 :             .clone()
     547         3170 :             .create(false)
     548         3170 :             .create_new(false)
     549         3170 :             .truncate(false);
     550              : 
     551         3170 :         let vfile = VirtualFileInner {
     552         3170 :             handle: RwLock::new(handle),
     553         3170 :             path: path.to_owned(),
     554         3170 :             open_options: reopen_options,
     555         3170 :         };
     556              : 
     557              :         // TODO: Under pressure, it's likely the slot will get re-used and
     558              :         // the underlying file closed before they get around to using it.
     559              :         // => https://github.com/neondatabase/neon/issues/6065
     560         3170 :         slot_guard.file.replace(file);
     561              : 
     562         3170 :         Ok(vfile)
     563         3170 :     }
     564              : 
     565              :     /// Async version of [`::utils::crashsafe::overwrite`].
     566              :     ///
     567              :     /// # NB:
     568              :     ///
     569              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     570              :     /// it did at an earlier time.
     571              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     572           14 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     573           14 :         final_path: Utf8PathBuf,
     574           14 :         tmp_path: Utf8PathBuf,
     575           14 :         content: B,
     576           14 :     ) -> std::io::Result<()> {
     577              :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     578              :         // See https://github.com/neondatabase/neon/issues/6663
     579              : 
     580           14 :         tokio::task::spawn_blocking(move || {
     581              :             let slice_storage;
     582           14 :             let content_len = content.bytes_init();
     583           14 :             let content = if content.bytes_init() > 0 {
     584           14 :                 slice_storage = Some(content.slice(0..content_len));
     585           14 :                 slice_storage.as_deref().expect("just set it to Some()")
     586              :             } else {
     587            0 :                 &[]
     588              :             };
     589           14 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     590           14 :                 .maybe_fatal_err("crashsafe_overwrite")
     591           14 :         })
     592           14 :         .await
     593           14 :         .expect("blocking task is never aborted")
     594           14 :     }
     595              : 
     596              :     /// Call File::sync_all() on the underlying File.
     597         1452 :     pub async fn sync_all(&self) -> Result<(), Error> {
     598         1452 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     599         1452 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     600         1452 :             res.maybe_fatal_err("sync_all")
     601              :         })
     602         1452 :     }
     603              : 
     604              :     /// Call File::sync_data() on the underlying File.
     605            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     606            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     607            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     608            0 :             res.maybe_fatal_err("sync_data")
     609              :         })
     610            0 :     }
     611              : 
     612          930 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     613          930 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     614          930 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     615          930 :             res
     616              :         })
     617          930 :     }
     618              : 
     619            7 :     pub async fn set_len(&self, len: u64, _ctx: &RequestContext) -> Result<(), Error> {
     620            7 :         with_file!(self, StorageIoOperation::SetLen, |file_guard| {
     621            7 :             let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
     622            7 :             res.maybe_fatal_err("set_len")
     623              :         })
     624            7 :     }
     625              : 
     626              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     627              :     /// opens it and evicts some other File if necessary. The passed parameter is
     628              :     /// assumed to be a function available for the physical `File`.
     629              :     ///
     630              :     /// We are doing it via a macro as Rust doesn't support async closures that
     631              :     /// take on parameters with lifetimes.
     632       300779 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     633       300779 :         let open_files = get_open_files();
     634              : 
     635        95852 :         let mut handle_guard = {
     636              :             // Read the cached slot handle, and see if the slot that it points to still
     637              :             // contains our File.
     638              :             //
     639              :             // We only need to hold the handle lock while we read the current handle. If
     640              :             // another thread closes the file and recycles the slot for a different file,
     641              :             // we will notice that the handle we read is no longer valid and retry.
     642       300779 :             let mut handle = *self.handle.read().await;
     643              :             loop {
     644              :                 // Check if the slot contains our File
     645              :                 {
     646       356288 :                     let slot = &open_files.slots[handle.index];
     647       356288 :                     let slot_guard = slot.inner.read().await;
     648       356288 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     649              :                         // Found a cached file descriptor.
     650       204927 :                         slot.recently_used.store(true, Ordering::Relaxed);
     651       204927 :                         return Ok(FileGuard { slot_guard });
     652       151361 :                     }
     653              :                 }
     654              : 
     655              :                 // The slot didn't contain our File. We will have to open it ourselves,
     656              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     657              :                 // that no other thread will try to concurrently open the same file.
     658       151361 :                 let handle_guard = self.handle.write().await;
     659              : 
     660              :                 // If another thread changed the handle while we were not holding the lock,
     661              :                 // then the handle might now be valid again. Loop back to retry.
     662       151361 :                 if *handle_guard != handle {
     663        55509 :                     handle = *handle_guard;
     664        55509 :                     continue;
     665        95852 :                 }
     666        95852 :                 break handle_guard;
     667              :             }
     668              :         };
     669              : 
     670              :         // We need to open the file ourselves. The handle in the VirtualFile is
     671              :         // now locked in write-mode. Find a free slot to put it in.
     672        95852 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     673              : 
     674              :         // Re-open the physical file.
     675              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     676              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     677              :         // of the virtual file descriptor cache.
     678        95852 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     679        95852 :             self.open_options.open(self.path.as_std_path()).await?
     680              :         });
     681              : 
     682              :         // Store the File in the slot and update the handle in the VirtualFile
     683              :         // to point to it.
     684        95852 :         slot_guard.file.replace(file);
     685              : 
     686        95852 :         *handle_guard = handle;
     687              : 
     688        95852 :         Ok(FileGuard {
     689        95852 :             slot_guard: slot_guard.downgrade(),
     690        95852 :         })
     691       300779 :     }
     692              : 
     693              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     694              :     ///
     695              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     696       278582 :     pub async fn read_exact_at<Buf>(
     697       278582 :         &self,
     698       278582 :         slice: Slice<Buf>,
     699       278582 :         offset: u64,
     700       278582 :         ctx: &RequestContext,
     701       278582 :     ) -> Result<Slice<Buf>, Error>
     702       278582 :     where
     703       278582 :         Buf: IoBufAlignedMut + Send,
     704       278582 :     {
     705       278582 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     706       278582 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     707              :         } else {
     708            0 :             None
     709              :         };
     710              : 
     711       278582 :         let original_bounds = slice.bounds();
     712       278582 :         let (buf, res) =
     713       278582 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     714       278582 :         let res = res.map(|_| buf.slice(original_bounds));
     715              : 
     716       278582 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     717       278582 :             if let Ok(slice) = &res {
     718       278582 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     719       278582 :                 assert_eq!(original_bounds, returned_bounds);
     720            0 :             }
     721            0 :         }
     722              : 
     723       278582 :         res
     724       278582 :     }
     725              : 
     726              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     727        17319 :     pub async fn read_exact_at_page(
     728        17319 :         &self,
     729        17319 :         page: PageWriteGuard<'static>,
     730        17319 :         offset: u64,
     731        17319 :         ctx: &RequestContext,
     732        17319 :     ) -> Result<PageWriteGuard<'static>, Error> {
     733        17319 :         let buf = PageWriteGuardBuf { page }.slice_full();
     734        17319 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     735        17319 :         self.read_exact_at(buf, offset, ctx)
     736        17319 :             .await
     737        17319 :             .map(|slice| slice.into_inner().page)
     738        17319 :     }
     739              : 
     740              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     741        19808 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     742        19808 :         &self,
     743        19808 :         buf: FullSlice<Buf>,
     744        19808 :         mut offset: u64,
     745        19808 :         ctx: &RequestContext,
     746        19808 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     747        19808 :         let buf = buf.into_raw_slice();
     748        19808 :         let bounds = buf.bounds();
     749        19808 :         let restore =
     750        19808 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     751        19808 :         let mut buf = buf;
     752        39616 :         while !buf.is_empty() {
     753        19808 :             let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
     754        19808 :             buf = tmp.into_raw_slice();
     755            0 :             match res {
     756              :                 Ok(0) => {
     757            0 :                     return (
     758            0 :                         restore(buf),
     759            0 :                         Err(Error::new(
     760            0 :                             std::io::ErrorKind::WriteZero,
     761            0 :                             "failed to write whole buffer",
     762            0 :                         )),
     763            0 :                     );
     764              :                 }
     765        19808 :                 Ok(n) => {
     766        19808 :                     buf = buf.slice(n..);
     767        19808 :                     offset += n as u64;
     768        19808 :                 }
     769            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     770            0 :                 Err(e) => return (restore(buf), Err(e)),
     771              :             }
     772              :         }
     773        19808 :         (restore(buf), Ok(()))
     774        19808 :     }
     775              : 
     776       278582 :     pub(super) async fn read_at<Buf>(
     777       278582 :         &self,
     778       278582 :         buf: tokio_epoll_uring::Slice<Buf>,
     779       278582 :         offset: u64,
     780       278582 :         ctx: &RequestContext,
     781       278582 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     782       278582 :     where
     783       278582 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     784       278582 :     {
     785       278582 :         self.validate_direct_io(
     786       278582 :             Slice::stable_ptr(&buf).addr(),
     787       278582 :             Slice::bytes_total(&buf),
     788       278582 :             offset,
     789              :         );
     790              : 
     791       278582 :         let file_guard = match self
     792       278582 :             .lock_file()
     793       278582 :             .await
     794       278582 :             .maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
     795              :         {
     796       278582 :             Ok(file_guard) => file_guard,
     797            0 :             Err(e) => return (buf, Err(e)),
     798              :         };
     799              : 
     800       278582 :         observe_duration!(StorageIoOperation::Read, {
     801       278582 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     802       278582 :             let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
     803       278582 :             if let Ok(size) = res {
     804       278582 :                 ctx.io_size_metrics().read.add(size.into_u64());
     805       278582 :             }
     806       278582 :             (buf, res)
     807              :         })
     808       278582 :     }
     809              : 
     810        19808 :     async fn write_at<B: IoBuf + Send>(
     811        19808 :         &self,
     812        19808 :         buf: FullSlice<B>,
     813        19808 :         offset: u64,
     814        19808 :         ctx: &RequestContext,
     815        19808 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     816        19808 :         self.validate_direct_io(buf.as_ptr().addr(), buf.len(), offset);
     817              : 
     818        19808 :         let file_guard = match self.lock_file().await {
     819        19808 :             Ok(file_guard) => file_guard,
     820            0 :             Err(e) => return (buf, Err(e)),
     821              :         };
     822        19808 :         observe_duration!(StorageIoOperation::Write, {
     823        19808 :             let ((_file_guard, buf), result) =
     824        19808 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     825        19808 :             let result = result.maybe_fatal_err("write_at");
     826        19808 :             if let Ok(size) = result {
     827        19808 :                 ctx.io_size_metrics().write.add(size.into_u64());
     828        19808 :             }
     829        19808 :             (buf, result)
     830              :         })
     831        19808 :     }
     832              : 
     833              :     /// Validate all reads and writes to adhere to the O_DIRECT requirements of our production systems.
     834              :     ///
     835              :     /// Validating it iin userspace sets a consistent bar, independent of what actual OS/filesystem/block device is in use.
     836       298390 :     fn validate_direct_io(&self, addr: usize, size: usize, offset: u64) {
     837              :         // TODO: eventually enable validation in the builds we use in real environments like staging, preprod, and prod.
     838       298390 :         if !(cfg!(feature = "testing") || cfg!(test)) {
     839            0 :             return;
     840       298390 :         }
     841       298390 :         if !self.open_options.is_direct() {
     842            0 :             return;
     843       298390 :         }
     844              : 
     845              :         // Validate buffer memory alignment.
     846              :         //
     847              :         // What practically matters as of Linux 6.1 is bdev_dma_alignment()
     848              :         // which is practically between 512 and 4096.
     849              :         // On our production systems, the value is 512.
     850              :         // The IoBuffer/IoBufferMut hard-code that value.
     851              :         //
     852              :         // Because the alloctor might return _more_ aligned addresses than requested,
     853              :         // there is a chance that testing would not catch violations of a runtime requirement stricter than 512.
     854              :         {
     855       298390 :             let requirement = 512;
     856       298390 :             let remainder = addr % requirement;
     857       298390 :             assert!(
     858       298390 :                 remainder == 0,
     859            0 :                 "Direct I/O buffer must be aligned: buffer_addr=0x{addr:x} % 0x{requirement:x} = 0x{remainder:x}"
     860              :             );
     861              :         }
     862              : 
     863              :         // Validate offset alignment.
     864              :         //
     865              :         // We hard-code 512 throughout the code base.
     866              :         // So enforce just that and not anything more restrictive.
     867              :         // Even the shallowest testing will expose more restrictive requirements if those ever arise.
     868              :         {
     869       298390 :             let requirement = 512;
     870       298390 :             let remainder = offset % requirement;
     871       298390 :             assert!(
     872       298390 :                 remainder == 0,
     873            0 :                 "Direct I/O offset must be aligned: offset=0x{offset:x} % 0x{requirement:x} = 0x{remainder:x}"
     874              :             );
     875              :         }
     876              : 
     877              :         // Validate buffer size multiple requirement.
     878              :         //
     879              :         // The requirement in Linux 6.1 is bdev_logical_block_size().
     880              :         // On our production systems, that is 512.
     881              :         {
     882       298390 :             let requirement = 512;
     883       298390 :             let remainder = size % requirement;
     884       298390 :             assert!(
     885       298390 :                 remainder == 0,
     886            0 :                 "Direct I/O buffer size must be a multiple of {requirement}: size=0x{size:x} % 0x{requirement:x} = 0x{remainder:x}"
     887              :             );
     888              :         }
     889       298390 :     }
     890              : }
     891              : 
     892              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     893       278586 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     894       278586 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     895       278586 :     mut offset: u64,
     896       278586 :     mut read_at: F,
     897       278586 : ) -> (Buf, std::io::Result<()>)
     898       278586 : where
     899       278586 :     Buf: IoBufMut + Send,
     900       278586 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
     901       278586 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
     902       278586 : {
     903       557173 :     while buf.bytes_total() != 0 {
     904              :         let res;
     905       278588 :         (buf, res) = read_at(buf, offset).await;
     906            0 :         match res {
     907            1 :             Ok(0) => break,
     908       278587 :             Ok(n) => {
     909       278587 :                 buf = buf.slice(n..);
     910       278587 :                 offset += n as u64;
     911       278587 :             }
     912            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     913            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     914              :         }
     915              :     }
     916              :     // NB: don't use `buf.is_empty()` here; it is from the
     917              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     918              :     // returned by it only covers the initialized portion of `buf`.
     919              :     // Whereas we're interested in ensuring that we filled the entire
     920              :     // buffer that the user passed in.
     921       278586 :     if buf.bytes_total() != 0 {
     922            1 :         (
     923            1 :             buf.into_inner(),
     924            1 :             Err(std::io::Error::new(
     925            1 :                 std::io::ErrorKind::UnexpectedEof,
     926            1 :                 "failed to fill whole buffer",
     927            1 :             )),
     928            1 :         )
     929              :     } else {
     930       278585 :         assert_eq!(buf.len(), buf.bytes_total());
     931       278585 :         (buf.into_inner(), Ok(()))
     932              :     }
     933       278586 : }
     934              : 
     935              : #[cfg(test)]
     936              : mod test_read_exact_at_impl {
     937              : 
     938              :     use std::collections::VecDeque;
     939              :     use std::sync::Arc;
     940              : 
     941              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     942              : 
     943              :     use super::read_exact_at_impl;
     944              : 
     945              :     struct Expectation {
     946              :         offset: u64,
     947              :         bytes_total: usize,
     948              :         result: std::io::Result<Vec<u8>>,
     949              :     }
     950              :     struct MockReadAt {
     951              :         expectations: VecDeque<Expectation>,
     952              :     }
     953              : 
     954              :     impl MockReadAt {
     955            6 :         async fn read_at(
     956            6 :             &mut self,
     957            6 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     958            6 :             offset: u64,
     959            6 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     960            6 :             let exp = self
     961            6 :                 .expectations
     962            6 :                 .pop_front()
     963            6 :                 .expect("read_at called but we have no expectations left");
     964            6 :             assert_eq!(exp.offset, offset);
     965            6 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     966            6 :             match exp.result {
     967            6 :                 Ok(bytes) => {
     968            6 :                     assert!(bytes.len() <= buf.bytes_total());
     969            6 :                     buf.put_slice(&bytes);
     970            6 :                     (buf, Ok(bytes.len()))
     971              :                 }
     972            0 :                 Err(e) => (buf, Err(e)),
     973              :             }
     974            6 :         }
     975              :     }
     976              : 
     977              :     impl Drop for MockReadAt {
     978            4 :         fn drop(&mut self) {
     979            4 :             assert_eq!(self.expectations.len(), 0);
     980            4 :         }
     981              :     }
     982              : 
     983              :     #[tokio::test]
     984            1 :     async fn test_basic() {
     985            1 :         let buf = Vec::with_capacity(5).slice_full();
     986            1 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     987            1 :             expectations: VecDeque::from(vec![Expectation {
     988            1 :                 offset: 0,
     989            1 :                 bytes_total: 5,
     990            1 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     991            1 :             }]),
     992            1 :         }));
     993            1 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     994            1 :             let mock_read_at = Arc::clone(&mock_read_at);
     995            1 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     996            1 :         })
     997            1 :         .await;
     998            1 :         assert!(res.is_ok());
     999            1 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
    1000            1 :     }
    1001              : 
    1002              :     #[tokio::test]
    1003            1 :     async fn test_empty_buf_issues_no_syscall() {
    1004            1 :         let buf = Vec::new().slice_full();
    1005            1 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1006            1 :             expectations: VecDeque::new(),
    1007            1 :         }));
    1008            1 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1009            0 :             let mock_read_at = Arc::clone(&mock_read_at);
    1010            0 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1011            0 :         })
    1012            1 :         .await;
    1013            1 :         assert!(res.is_ok());
    1014            1 :     }
    1015              : 
    1016              :     #[tokio::test]
    1017            1 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
    1018            1 :         let buf = Vec::with_capacity(4).slice_full();
    1019            1 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1020            1 :             expectations: VecDeque::from(vec![
    1021            1 :                 Expectation {
    1022            1 :                     offset: 0,
    1023            1 :                     bytes_total: 4,
    1024            1 :                     result: Ok(vec![b'a', b'b']),
    1025            1 :                 },
    1026            1 :                 Expectation {
    1027            1 :                     offset: 2,
    1028            1 :                     bytes_total: 2,
    1029            1 :                     result: Ok(vec![b'c', b'd']),
    1030            1 :                 },
    1031            1 :             ]),
    1032            1 :         }));
    1033            2 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1034            2 :             let mock_read_at = Arc::clone(&mock_read_at);
    1035            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1036            2 :         })
    1037            1 :         .await;
    1038            1 :         assert!(res.is_ok());
    1039            1 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
    1040            1 :     }
    1041              : 
    1042              :     #[tokio::test]
    1043            1 :     async fn test_eof_before_buffer_full() {
    1044            1 :         let buf = Vec::with_capacity(3).slice_full();
    1045            1 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1046            1 :             expectations: VecDeque::from(vec![
    1047            1 :                 Expectation {
    1048            1 :                     offset: 0,
    1049            1 :                     bytes_total: 3,
    1050            1 :                     result: Ok(vec![b'a']),
    1051            1 :                 },
    1052            1 :                 Expectation {
    1053            1 :                     offset: 1,
    1054            1 :                     bytes_total: 2,
    1055            1 :                     result: Ok(vec![b'b']),
    1056            1 :                 },
    1057            1 :                 Expectation {
    1058            1 :                     offset: 2,
    1059            1 :                     bytes_total: 1,
    1060            1 :                     result: Ok(vec![]),
    1061            1 :                 },
    1062            1 :             ]),
    1063            1 :         }));
    1064            3 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1065            3 :             let mock_read_at = Arc::clone(&mock_read_at);
    1066            3 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1067            3 :         })
    1068            1 :         .await;
    1069            1 :         let Err(err) = res else {
    1070            0 :             panic!("should return an error");
    1071              :         };
    1072            1 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
    1073            1 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
    1074              :         // buffer contents on error are unspecified
    1075            1 :     }
    1076              : }
    1077              : 
    1078              : struct FileGuard {
    1079              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
    1080              : }
    1081              : 
    1082              : impl AsRef<OwnedFd> for FileGuard {
    1083       300779 :     fn as_ref(&self) -> &OwnedFd {
    1084              :         // This unwrap is safe because we only create `FileGuard`s
    1085              :         // if we know that the file is Some.
    1086       300779 :         self.slot_guard.file.as_ref().unwrap()
    1087       300779 :     }
    1088              : }
    1089              : 
    1090              : impl FileGuard {
    1091              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1092            7 :     fn with_std_file<F, R>(&self, with: F) -> R
    1093            7 :     where
    1094            7 :         F: FnOnce(&File) -> R,
    1095              :     {
    1096              :         // SAFETY:
    1097              :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1098              :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
    1099            7 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1100            7 :         let res = with(&file);
    1101            7 :         let _ = file.into_raw_fd();
    1102            7 :         res
    1103            7 :     }
    1104              : }
    1105              : 
    1106              : impl tokio_epoll_uring::IoFd for FileGuard {
    1107       300772 :     unsafe fn as_fd(&self) -> RawFd {
    1108       300772 :         let owned_fd: &OwnedFd = self.as_ref();
    1109       300772 :         owned_fd.as_raw_fd()
    1110       300772 :     }
    1111              : }
    1112              : 
    1113              : #[cfg(test)]
    1114              : impl VirtualFile {
    1115         5229 :     pub(crate) async fn read_blk(
    1116         5229 :         &self,
    1117         5229 :         blknum: u32,
    1118         5229 :         ctx: &RequestContext,
    1119         5229 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1120         5229 :         self.inner.read_blk(blknum, ctx).await
    1121         5229 :     }
    1122              : }
    1123              : 
    1124              : #[cfg(test)]
    1125              : impl VirtualFileInner {
    1126         5229 :     pub(crate) async fn read_blk(
    1127         5229 :         &self,
    1128         5229 :         blknum: u32,
    1129         5229 :         ctx: &RequestContext,
    1130         5229 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1131              :         use crate::page_cache::PAGE_SZ;
    1132         5229 :         let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
    1133         5229 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1134         5229 :         let slice = self
    1135         5229 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1136         5229 :             .await?;
    1137         5229 :         Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
    1138         5229 :             slice.into_inner(),
    1139         5229 :         ))
    1140         5229 :     }
    1141              : }
    1142              : 
    1143              : impl Drop for VirtualFileInner {
    1144              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1145         2745 :     fn drop(&mut self) {
    1146         2745 :         let handle = self.handle.get_mut();
    1147              : 
    1148         2745 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1149         2745 :             if slot_guard.tag == tag {
    1150         2433 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1151              :                 // there is also operation "close-by-replace" for closes done on eviction for
    1152              :                 // comparison.
    1153         2433 :                 if let Some(fd) = slot_guard.file.take() {
    1154         2433 :                     STORAGE_IO_TIME_METRIC
    1155         2433 :                         .get(StorageIoOperation::Close)
    1156         2433 :                         .observe_closure_duration(|| drop(fd));
    1157            0 :                 }
    1158          312 :             }
    1159         2745 :         }
    1160              : 
    1161              :         // We don't have async drop so we cannot directly await the lock here.
    1162              :         // Instead, first do a best-effort attempt at closing the underlying
    1163              :         // file descriptor by using `try_write`, and if that fails, spawn
    1164              :         // a tokio task to do it asynchronously: we just want it to be
    1165              :         // cleaned up eventually.
    1166              :         // Most of the time, the `try_lock` should succeed though,
    1167              :         // as we have `&mut self` access. In other words, if the slot
    1168              :         // is still occupied by our file, there should be no access from
    1169              :         // other I/O operations; the only other possible place to lock
    1170              :         // the slot is the lock algorithm looking for free slots.
    1171         2745 :         let slot = &get_open_files().slots[handle.index];
    1172         2745 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1173         2744 :             clean_slot(slot, slot_guard, handle.tag);
    1174         2744 :         } else {
    1175            1 :             let tag = handle.tag;
    1176            1 :             tokio::spawn(async move {
    1177            1 :                 let slot_guard = slot.inner.write().await;
    1178            1 :                 clean_slot(slot, slot_guard, tag);
    1179            1 :             });
    1180              :         };
    1181         2745 :     }
    1182              : }
    1183              : 
    1184              : impl OwnedAsyncWriter for VirtualFile {
    1185            0 :     async fn write_all_at<Buf: IoBufAligned + Send>(
    1186            0 :         &self,
    1187            0 :         buf: FullSlice<Buf>,
    1188            0 :         offset: u64,
    1189            0 :         ctx: &RequestContext,
    1190            0 :     ) -> (FullSlice<Buf>, std::io::Result<()>) {
    1191            0 :         VirtualFile::write_all_at(self, buf, offset, ctx).await
    1192            0 :     }
    1193            0 :     async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
    1194            0 :         VirtualFile::set_len(self, len, ctx).await
    1195            0 :     }
    1196              : }
    1197              : 
    1198              : impl OpenFiles {
    1199          122 :     fn new(num_slots: usize) -> OpenFiles {
    1200          122 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1201         1220 :         for _ in 0..num_slots {
    1202         1220 :             let slot = Slot {
    1203         1220 :                 recently_used: AtomicBool::new(false),
    1204         1220 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1205         1220 :             };
    1206         1220 :             slots.push(slot);
    1207         1220 :         }
    1208              : 
    1209          122 :         OpenFiles {
    1210          122 :             next: AtomicUsize::new(0),
    1211          122 :             slots: Box::leak(slots),
    1212          122 :         }
    1213          122 :     }
    1214              : }
    1215              : 
    1216              : ///
    1217              : /// Initialize the virtual file module. This must be called once at page
    1218              : /// server startup.
    1219              : ///
    1220              : #[cfg(not(test))]
    1221            0 : pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode, sync_mode: SyncMode) {
    1222            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1223            0 :         panic!("virtual_file::init called twice");
    1224            0 :     }
    1225            0 :     set_io_mode(mode);
    1226            0 :     io_engine::init(engine);
    1227            0 :     SYNC_MODE.store(sync_mode as u8, std::sync::atomic::Ordering::Relaxed);
    1228            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1229            0 : }
    1230              : 
    1231              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1232              : 
    1233              : // Get a handle to the global slots array.
    1234       306694 : fn get_open_files() -> &'static OpenFiles {
    1235              :     //
    1236              :     // In unit tests, page server startup doesn't happen and no one calls
    1237              :     // virtual_file::init(). Initialize it here, with a small array.
    1238              :     //
    1239              :     // This applies to the virtual file tests below, but all other unit
    1240              :     // tests too, so the virtual file facility is always usable in
    1241              :     // unit tests.
    1242              :     //
    1243       306694 :     if cfg!(test) {
    1244       306694 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1245              :     } else {
    1246            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1247              :     }
    1248       306694 : }
    1249              : 
    1250              : /// Gets the io buffer alignment.
    1251            0 : pub(crate) const fn get_io_buffer_alignment() -> usize {
    1252            0 :     DEFAULT_IO_BUFFER_ALIGNMENT
    1253            0 : }
    1254              : 
    1255              : pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
    1256              : pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
    1257              : pub(crate) type IoPageSlice<'a> =
    1258              :     AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
    1259              : 
    1260          122 : static IO_MODE: LazyLock<AtomicU8> = LazyLock::new(|| AtomicU8::new(IoMode::preferred() as u8));
    1261              : 
    1262            0 : pub fn set_io_mode(mode: IoMode) {
    1263            0 :     IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
    1264            0 : }
    1265              : 
    1266         2654 : pub(crate) fn get_io_mode() -> IoMode {
    1267         2654 :     IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
    1268         2654 : }
    1269              : 
    1270              : static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
    1271              : 
    1272              : #[cfg(test)]
    1273              : mod tests {
    1274              :     use std::os::unix::fs::FileExt;
    1275              :     use std::sync::Arc;
    1276              : 
    1277              :     use owned_buffers_io::io_buf_ext::IoBufExt;
    1278              :     use rand::seq::SliceRandom;
    1279              :     use rand::{Rng, thread_rng};
    1280              : 
    1281              :     use super::*;
    1282              :     use crate::context::DownloadBehavior;
    1283              :     use crate::task_mgr::TaskKind;
    1284              : 
    1285              :     #[tokio::test]
    1286            1 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1287            1 :         let ctx =
    1288            1 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1289            1 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_virtual_files");
    1290            1 :         std::fs::create_dir_all(&testdir)?;
    1291              : 
    1292          203 :         let zeropad512 = |content: &[u8]| {
    1293          203 :             let mut buf = IoBufferMut::with_capacity_zeroed(512);
    1294          203 :             buf[..content.len()].copy_from_slice(content);
    1295          203 :             buf.freeze().slice_len()
    1296          203 :         };
    1297              : 
    1298            1 :         let path_a = testdir.join("file_a");
    1299            1 :         let file_a = VirtualFile::open_with_options_v2(
    1300            1 :             path_a.clone(),
    1301            1 :             OpenOptions::new()
    1302            1 :                 .read(true)
    1303            1 :                 .write(true)
    1304            1 :                 // set create & truncate flags to ensure when we trigger a reopen later in this test,
    1305            1 :                 // the reopen_options must have masked out those flags; if they don't, then
    1306            1 :                 // the after reopen we will fail to read the `content_a` that we write here.
    1307            1 :                 .create(true)
    1308            1 :                 .truncate(true),
    1309            1 :             &ctx,
    1310            1 :         )
    1311            1 :         .await?;
    1312            1 :         let (_, res) = file_a.write_all_at(zeropad512(b"content_a"), 0, &ctx).await;
    1313            1 :         res?;
    1314              : 
    1315            1 :         let path_b = testdir.join("file_b");
    1316            1 :         let file_b = VirtualFile::open_with_options_v2(
    1317            1 :             path_b.clone(),
    1318            1 :             OpenOptions::new()
    1319            1 :                 .read(true)
    1320            1 :                 .write(true)
    1321            1 :                 .create(true)
    1322            1 :                 .truncate(true),
    1323            1 :             &ctx,
    1324            1 :         )
    1325            1 :         .await?;
    1326            1 :         let (_, res) = file_b.write_all_at(zeropad512(b"content_b"), 0, &ctx).await;
    1327            1 :         res?;
    1328              : 
    1329          402 :         let assert_first_512_eq = async |vfile: &VirtualFile, expect: &[u8]| {
    1330          201 :             let buf = vfile
    1331          201 :                 .read_exact_at(IoBufferMut::with_capacity_zeroed(512).slice_full(), 0, &ctx)
    1332          201 :                 .await
    1333          201 :                 .unwrap();
    1334          201 :             assert_eq!(&buf[..], &zeropad512(expect)[..]);
    1335          402 :         };
    1336              : 
    1337              :         // Open a lot of file descriptors / VirtualFile instances.
    1338              :         // Enough to cause some evictions in the fd cache.
    1339              : 
    1340            1 :         let mut file_b_dupes = Vec::new();
    1341          101 :         for _ in 0..100 {
    1342          100 :             let vfile = VirtualFile::open_with_options_v2(
    1343          100 :                 path_b.clone(),
    1344          100 :                 OpenOptions::new().read(true),
    1345          100 :                 &ctx,
    1346          100 :             )
    1347          100 :             .await?;
    1348          100 :             assert_first_512_eq(&vfile, b"content_b").await;
    1349          100 :             file_b_dupes.push(vfile);
    1350              :         }
    1351              : 
    1352              :         // make sure we opened enough files to definitely cause evictions.
    1353            1 :         assert!(file_b_dupes.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1354              : 
    1355              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1356              :         // from it again. The VirtualFile reopens the file internally.
    1357            1 :         assert_first_512_eq(&file_a, b"content_a").await;
    1358              : 
    1359              :         // Check that all the other FDs still work too. Use them in random order for
    1360              :         // good measure.
    1361            1 :         file_b_dupes.as_mut_slice().shuffle(&mut thread_rng());
    1362          100 :         for vfile in file_b_dupes.iter_mut() {
    1363          100 :             assert_first_512_eq(vfile, b"content_b").await;
    1364            1 :         }
    1365            1 : 
    1366            1 :         Ok(())
    1367            1 :     }
    1368              : 
    1369              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1370              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1371              :     /// VirtualFile from multiple threads concurrently.
    1372              :     #[tokio::test]
    1373            1 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1374              :         const SIZE: usize = 8 * 1024;
    1375              :         const VIRTUAL_FILES: usize = 100;
    1376              :         const THREADS: usize = 100;
    1377              :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1378              : 
    1379            1 :         let ctx =
    1380            1 :             RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
    1381            1 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1382            1 :         std::fs::create_dir_all(&testdir)?;
    1383              : 
    1384              :         // Create a test file.
    1385            1 :         let test_file_path = testdir.join("concurrency_test_file");
    1386              :         {
    1387            1 :             let file = File::create(&test_file_path)?;
    1388            1 :             file.write_all_at(&SAMPLE, 0)?;
    1389              :         }
    1390              : 
    1391              :         // Open the file many times.
    1392            1 :         let mut files = Vec::new();
    1393          101 :         for _ in 0..VIRTUAL_FILES {
    1394          100 :             let f = VirtualFile::open_with_options_v2(
    1395          100 :                 &test_file_path,
    1396          100 :                 OpenOptions::new().read(true),
    1397          100 :                 &ctx,
    1398          100 :             )
    1399          100 :             .await?;
    1400          100 :             files.push(f);
    1401              :         }
    1402            1 :         let files = Arc::new(files);
    1403              : 
    1404              :         // Launch many threads, and use the virtual files concurrently in random order.
    1405            1 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1406            1 :             .worker_threads(THREADS)
    1407            1 :             .thread_name("test_vfile_concurrency thread")
    1408            1 :             .build()
    1409            1 :             .unwrap();
    1410            1 :         let mut hdls = Vec::new();
    1411          101 :         for _threadno in 0..THREADS {
    1412          100 :             let files = files.clone();
    1413          100 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1414          100 :             let hdl = rt.spawn(async move {
    1415          100 :                 let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
    1416          100 :                 let mut rng = rand::rngs::OsRng;
    1417       100000 :                 for _ in 1..1000 {
    1418        99900 :                     let f = &files[rng.gen_range(0..files.len())];
    1419        99900 :                     buf = f
    1420        99900 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1421        99900 :                         .await
    1422        99900 :                         .unwrap()
    1423        99900 :                         .into_inner();
    1424        99900 :                     assert!(buf[..] == SAMPLE);
    1425              :                 }
    1426          100 :             });
    1427          100 :             hdls.push(hdl);
    1428              :         }
    1429          101 :         for hdl in hdls {
    1430          100 :             hdl.await?;
    1431              :         }
    1432            1 :         std::mem::forget(rt);
    1433              : 
    1434            2 :         Ok(())
    1435            1 :     }
    1436              : 
    1437              :     #[tokio::test]
    1438            1 :     async fn test_atomic_overwrite_basic() {
    1439            1 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1440            1 :         std::fs::create_dir_all(&testdir).unwrap();
    1441              : 
    1442            1 :         let path = testdir.join("myfile");
    1443            1 :         let tmp_path = testdir.join("myfile.tmp");
    1444              : 
    1445            1 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1446            1 :             .await
    1447            1 :             .unwrap();
    1448              : 
    1449            1 :         let post = std::fs::read_to_string(&path).unwrap();
    1450            1 :         assert_eq!(post, "foo");
    1451            1 :         assert!(!tmp_path.exists());
    1452              : 
    1453            1 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1454            1 :             .await
    1455            1 :             .unwrap();
    1456              : 
    1457            1 :         let post = std::fs::read_to_string(&path).unwrap();
    1458            1 :         assert_eq!(post, "bar");
    1459            1 :         assert!(!tmp_path.exists());
    1460            1 :     }
    1461              : 
    1462              :     #[tokio::test]
    1463            1 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1464            1 :         let testdir =
    1465            1 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1466            1 :         std::fs::create_dir_all(&testdir).unwrap();
    1467              : 
    1468            1 :         let path = testdir.join("myfile");
    1469            1 :         let tmp_path = testdir.join("myfile.tmp");
    1470              : 
    1471            1 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1472            1 :         assert!(tmp_path.exists());
    1473              : 
    1474            1 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1475            1 :             .await
    1476            1 :             .unwrap();
    1477              : 
    1478            1 :         let post = std::fs::read_to_string(&path).unwrap();
    1479            1 :         assert_eq!(post, "foo");
    1480            1 :         assert!(!tmp_path.exists());
    1481            1 :     }
    1482              : }
        

Generated by: LCOV version 2.1-beta