LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 92.1 % 1120 1031
Test Date: 2024-11-13 18:23:39 Functions: 88.0 % 259 228

            Line data    Source code
       1              : //! VirtualFile is like a normal File, but it's not bound directly to
       2              : //! a file descriptor.
       3              : //!
       4              : //! Instead, the file is opened when it's read from,
       5              : //! and if too many files are open globally in the system, least-recently
       6              : //! used ones are closed.
       7              : //!
       8              : //! To track which files have been recently used, we use the clock algorithm
       9              : //! with a 'recently_used' flag on each slot.
      10              : //!
      11              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      12              : //! src/backend/storage/file/fd.c
      13              : //!
      14              : use crate::context::RequestContext;
      15              : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      16              : 
      17              : use crate::page_cache::{PageWriteGuard, PAGE_SZ};
      18              : use crate::tenant::TENANTS_SEGMENT_NAME;
      19              : use camino::{Utf8Path, Utf8PathBuf};
      20              : use once_cell::sync::OnceCell;
      21              : use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
      22              : use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
      23              : use owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
      24              : use owned_buffers_io::io_buf_ext::FullSlice;
      25              : use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
      26              : use pageserver_api::shard::TenantShardId;
      27              : use std::fs::File;
      28              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      29              : #[cfg(target_os = "linux")]
      30              : use std::os::unix::fs::OpenOptionsExt;
      31              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      32              : 
      33              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      34              : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
      35              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      36              : use tokio::time::Instant;
      37              : 
      38              : pub use pageserver_api::models::virtual_file as api;
      39              : pub(crate) mod io_engine;
      40              : pub use io_engine::feature_test as io_engine_feature_test;
      41              : pub use io_engine::io_engine_for_bench;
      42              : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
      43              : mod metadata;
      44              : mod open_options;
      45              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      46              : pub(crate) use api::IoMode;
      47              : pub(crate) use io_engine::IoEngineKind;
      48              : pub(crate) use metadata::Metadata;
      49              : pub(crate) use open_options::*;
      50              : 
      51              : pub(crate) mod owned_buffers_io {
      52              :     //! Abstractions for IO with owned buffers.
      53              :     //!
      54              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      55              :     //! reason we need this abstraction.
      56              :     //!
      57              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      58              :     //! but for the time being we're proving out the primitives in the neon.git repo
      59              :     //! for faster iteration.
      60              : 
      61              :     pub(crate) mod aligned_buffer;
      62              :     pub(crate) mod io_buf_aligned;
      63              :     pub(crate) mod io_buf_ext;
      64              :     pub(crate) mod slice;
      65              :     pub(crate) mod write;
      66              :     pub(crate) mod util {
      67              :         pub(crate) mod size_tracking_writer;
      68              :     }
      69              : }
      70              : 
      71              : #[derive(Debug)]
      72              : pub struct VirtualFile {
      73              :     inner: VirtualFileInner,
      74              :     _mode: IoMode,
      75              : }
      76              : 
      77              : impl VirtualFile {
      78              :     /// Open a file in read-only mode. Like File::open.
      79         1048 :     pub async fn open<P: AsRef<Utf8Path>>(
      80         1048 :         path: P,
      81         1048 :         ctx: &RequestContext,
      82         1048 :     ) -> Result<Self, std::io::Error> {
      83         1048 :         let inner = VirtualFileInner::open(path, ctx).await?;
      84         1048 :         Ok(VirtualFile {
      85         1048 :             inner,
      86         1048 :             _mode: IoMode::Buffered,
      87         1048 :         })
      88         1048 :     }
      89              : 
      90              :     /// Open a file in read-only mode. Like File::open.
      91              :     ///
      92              :     /// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
      93         1190 :     pub async fn open_v2<P: AsRef<Utf8Path>>(
      94         1190 :         path: P,
      95         1190 :         ctx: &RequestContext,
      96         1190 :     ) -> Result<Self, std::io::Error> {
      97         1190 :         Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
      98         1190 :     }
      99              : 
     100         1481 :     pub async fn create<P: AsRef<Utf8Path>>(
     101         1481 :         path: P,
     102         1481 :         ctx: &RequestContext,
     103         1481 :     ) -> Result<Self, std::io::Error> {
     104         1481 :         let inner = VirtualFileInner::create(path, ctx).await?;
     105         1481 :         Ok(VirtualFile {
     106         1481 :             inner,
     107         1481 :             _mode: IoMode::Buffered,
     108         1481 :         })
     109         1481 :     }
     110              : 
     111            0 :     pub async fn create_v2<P: AsRef<Utf8Path>>(
     112            0 :         path: P,
     113            0 :         ctx: &RequestContext,
     114            0 :     ) -> Result<Self, std::io::Error> {
     115            0 :         VirtualFile::open_with_options_v2(
     116            0 :             path.as_ref(),
     117            0 :             OpenOptions::new().write(true).create(true).truncate(true),
     118            0 :             ctx,
     119            0 :         )
     120            0 :         .await
     121            0 :     }
     122              : 
     123         1994 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     124         1994 :         path: P,
     125         1994 :         open_options: &OpenOptions,
     126         1994 :         ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
     127         1994 :     ) -> Result<Self, std::io::Error> {
     128         1994 :         let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
     129         1994 :         Ok(VirtualFile {
     130         1994 :             inner,
     131         1994 :             _mode: IoMode::Buffered,
     132         1994 :         })
     133         1994 :     }
     134              : 
     135         1190 :     pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
     136         1190 :         path: P,
     137         1190 :         open_options: &OpenOptions,
     138         1190 :         ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
     139         1190 :     ) -> Result<Self, std::io::Error> {
     140         1190 :         let file = match get_io_mode() {
     141              :             IoMode::Buffered => {
     142         1190 :                 let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
     143         1190 :                 VirtualFile {
     144         1190 :                     inner,
     145         1190 :                     _mode: IoMode::Buffered,
     146         1190 :                 }
     147              :             }
     148              :             #[cfg(target_os = "linux")]
     149              :             IoMode::Direct => {
     150            0 :                 let inner = VirtualFileInner::open_with_options(
     151            0 :                     path,
     152            0 :                     open_options.clone().custom_flags(nix::libc::O_DIRECT),
     153            0 :                     ctx,
     154            0 :                 )
     155            0 :                 .await?;
     156            0 :                 VirtualFile {
     157            0 :                     inner,
     158            0 :                     _mode: IoMode::Direct,
     159            0 :                 }
     160              :             }
     161              :         };
     162         1190 :         Ok(file)
     163         1190 :     }
     164              : 
     165         1156 :     pub fn path(&self) -> &Utf8Path {
     166         1156 :         self.inner.path.as_path()
     167         1156 :     }
     168              : 
     169           22 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     170           22 :         final_path: Utf8PathBuf,
     171           22 :         tmp_path: Utf8PathBuf,
     172           22 :         content: B,
     173           22 :     ) -> std::io::Result<()> {
     174           22 :         VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
     175           22 :     }
     176              : 
     177         2733 :     pub async fn sync_all(&self) -> Result<(), Error> {
     178         2733 :         self.inner.sync_all().await
     179         2733 :     }
     180              : 
     181            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     182            0 :         self.inner.sync_data().await
     183            0 :     }
     184              : 
     185         1730 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     186         1730 :         self.inner.metadata().await
     187         1730 :     }
     188              : 
     189          224 :     pub fn remove(self) {
     190          224 :         self.inner.remove();
     191          224 :     }
     192              : 
     193         5426 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     194         5426 :         self.inner.seek(pos).await
     195         5426 :     }
     196              : 
     197       512798 :     pub async fn read_exact_at<Buf>(
     198       512798 :         &self,
     199       512798 :         slice: Slice<Buf>,
     200       512798 :         offset: u64,
     201       512798 :         ctx: &RequestContext,
     202       512798 :     ) -> Result<Slice<Buf>, Error>
     203       512798 :     where
     204       512798 :         Buf: IoBufAlignedMut + Send,
     205       512798 :     {
     206       512798 :         self.inner.read_exact_at(slice, offset, ctx).await
     207       512798 :     }
     208              : 
     209        32049 :     pub async fn read_exact_at_page(
     210        32049 :         &self,
     211        32049 :         page: PageWriteGuard<'static>,
     212        32049 :         offset: u64,
     213        32049 :         ctx: &RequestContext,
     214        32049 :     ) -> Result<PageWriteGuard<'static>, Error> {
     215        32049 :         self.inner.read_exact_at_page(page, offset, ctx).await
     216        32049 :     }
     217              : 
     218            4 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     219            4 :         &self,
     220            4 :         buf: FullSlice<Buf>,
     221            4 :         offset: u64,
     222            4 :         ctx: &RequestContext,
     223            4 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     224            4 :         self.inner.write_all_at(buf, offset, ctx).await
     225            4 :     }
     226              : 
     227      1136176 :     pub async fn write_all<Buf: IoBuf + Send>(
     228      1136176 :         &mut self,
     229      1136176 :         buf: FullSlice<Buf>,
     230      1136176 :         ctx: &RequestContext,
     231      1136176 :     ) -> (FullSlice<Buf>, Result<usize, Error>) {
     232      1136176 :         self.inner.write_all(buf, ctx).await
     233      1136176 :     }
     234              : }
     235              : 
     236              : ///
     237              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
     238              : /// the underlying file is closed if the system is low on file descriptors,
     239              : /// and re-opened when it's accessed again.
     240              : ///
     241              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
     242              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
     243              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
     244              : /// require a mutable reference, because they modify the "current position".
     245              : ///
     246              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
     247              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
     248              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
     249              : /// 'tag' field is used to detect whether the handle still is valid or not.
     250              : ///
     251              : #[derive(Debug)]
     252              : pub struct VirtualFileInner {
     253              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
     254              :     /// might contain our File, or it may be empty, or it may contain a File that
     255              :     /// belongs to a different VirtualFile.
     256              :     handle: RwLock<SlotHandle>,
     257              : 
     258              :     /// Current file position
     259              :     pos: u64,
     260              : 
     261              :     /// File path and options to use to open it.
     262              :     ///
     263              :     /// Note: this only contains the options needed to re-open it. For example,
     264              :     /// if a new file is created, we only pass the create flag when it's initially
     265              :     /// opened, in the VirtualFile::create() function, and strip the flag before
     266              :     /// storing it here.
     267              :     pub path: Utf8PathBuf,
     268              :     open_options: OpenOptions,
     269              : 
     270              :     // These are strings becase we only use them for metrics, and those expect strings.
     271              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
     272              :     // strings.
     273              :     tenant_id: String,
     274              :     shard_id: String,
     275              :     timeline_id: String,
     276              : }
     277              : 
     278              : #[derive(Debug, PartialEq, Clone, Copy)]
     279              : struct SlotHandle {
     280              :     /// Index into OPEN_FILES.slots
     281              :     index: usize,
     282              : 
     283              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     284              :     /// been recycled and no longer contains the FD for this virtual file.
     285              :     tag: u64,
     286              : }
     287              : 
     288              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     289              : /// are currently open. Each slot in the array is protected by a separate lock,
     290              : /// so that different files can be accessed independently. The lock must be held
     291              : /// in write mode to replace the slot with a different file, but a read mode
     292              : /// is enough to operate on the file, whether you're reading or writing to it.
     293              : ///
     294              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     295              : /// the virtual_file::init() function. It must be called exactly once at page
     296              : /// server startup.
     297              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     298              : 
     299              : struct OpenFiles {
     300              :     slots: &'static [Slot],
     301              : 
     302              :     /// clock arm for the clock algorithm
     303              :     next: AtomicUsize,
     304              : }
     305              : 
     306              : struct Slot {
     307              :     inner: RwLock<SlotInner>,
     308              : 
     309              :     /// has this file been used since last clock sweep?
     310              :     recently_used: AtomicBool,
     311              : }
     312              : 
     313              : struct SlotInner {
     314              :     /// Counter that's incremented every time a different file is stored here.
     315              :     /// To avoid the ABA problem.
     316              :     tag: u64,
     317              : 
     318              :     /// the underlying file
     319              :     file: Option<OwnedFd>,
     320              : }
     321              : 
     322              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     323              : struct PageWriteGuardBuf {
     324              :     page: PageWriteGuard<'static>,
     325              : }
     326              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     327              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     328              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     329              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     330              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     331       128227 :     fn stable_ptr(&self) -> *const u8 {
     332       128227 :         self.page.as_ptr()
     333       128227 :     }
     334       240414 :     fn bytes_init(&self) -> usize {
     335       240414 :         self.page.len()
     336       240414 :     }
     337        96147 :     fn bytes_total(&self) -> usize {
     338        96147 :         self.page.len()
     339        96147 :     }
     340              : }
     341              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     342              : // hence it's safe to hand out the `stable_mut_ptr()`.
     343              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     344        48089 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     345        48089 :         self.page.as_mut_ptr()
     346        48089 :     }
     347              : 
     348        32049 :     unsafe fn set_init(&mut self, pos: usize) {
     349        32049 :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     350        32049 :         assert!(pos <= self.page.len());
     351        32049 :     }
     352              : }
     353              : 
     354              : impl OpenFiles {
     355              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     356              :     ///
     357              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     358              :     /// recently_used has been set. It's all ready for reuse.
     359       193317 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     360       193317 :         //
     361       193317 :         // Run the clock algorithm to find a slot to replace.
     362       193317 :         //
     363       193317 :         let num_slots = self.slots.len();
     364       193317 :         let mut retries = 0;
     365              :         let mut slot;
     366              :         let mut slot_guard;
     367              :         let index;
     368              :         loop {
     369      2326173 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     370      2326173 :             slot = &self.slots[next];
     371      2326173 : 
     372      2326173 :             // If the recently_used flag on this slot is set, continue the clock
     373      2326173 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     374      2326173 :             // lock, also continue the clock sweep.
     375      2326173 :             //
     376      2326173 :             // We only continue in this manner for a while, though. If we loop
     377      2326173 :             // through the array twice without finding a victim, just pick the
     378      2326173 :             // next slot and wait until we can reuse it. This way, we avoid
     379      2326173 :             // spinning in the extreme case that all the slots are busy with an
     380      2326173 :             // I/O operation.
     381      2326173 :             if retries < num_slots * 2 {
     382      2237411 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     383      2029873 :                     if let Ok(guard) = slot.inner.try_write() {
     384       104555 :                         slot_guard = guard;
     385       104555 :                         index = next;
     386       104555 :                         break;
     387      1925318 :                     }
     388       207538 :                 }
     389      2132856 :                 retries += 1;
     390              :             } else {
     391        88762 :                 slot_guard = slot.inner.write().await;
     392        88762 :                 index = next;
     393        88762 :                 break;
     394              :             }
     395              :         }
     396              : 
     397              :         //
     398              :         // We now have the victim slot locked. If it was in use previously, close the
     399              :         // old file.
     400              :         //
     401       193317 :         if let Some(old_file) = slot_guard.file.take() {
     402       188509 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     403       188509 :             // distinguish the two.
     404       188509 :             STORAGE_IO_TIME_METRIC
     405       188509 :                 .get(StorageIoOperation::CloseByReplace)
     406       188509 :                 .observe_closure_duration(|| drop(old_file));
     407       188509 :         }
     408              : 
     409              :         // Prepare the slot for reuse and return it
     410       193317 :         slot_guard.tag += 1;
     411       193317 :         slot.recently_used.store(true, Ordering::Relaxed);
     412       193317 :         (
     413       193317 :             SlotHandle {
     414       193317 :                 index,
     415       193317 :                 tag: slot_guard.tag,
     416       193317 :             },
     417       193317 :             slot_guard,
     418       193317 :         )
     419       193317 :     }
     420              : }
     421              : 
     422              : /// Identify error types that should alwways terminate the process.  Other
     423              : /// error types may be elegible for retry.
     424            2 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     425              :     use nix::errno::Errno::*;
     426            2 :     match e.raw_os_error().map(nix::errno::from_i32) {
     427              :         Some(EIO) => {
     428              :             // Terminate on EIO because we no longer trust the device to store
     429              :             // data safely, or to uphold persistence guarantees on fsync.
     430            0 :             true
     431              :         }
     432              :         Some(EROFS) => {
     433              :             // Terminate on EROFS because a filesystem is usually remounted
     434              :             // readonly when it has experienced some critical issue, so the same
     435              :             // logic as EIO applies.
     436            0 :             true
     437              :         }
     438              :         Some(EACCES) => {
     439              :             // Terminate on EACCESS because we should always have permissions
     440              :             // for our own data dir: if we don't, then we can't do our job and
     441              :             // need administrative intervention to fix permissions.  Terminating
     442              :             // is the best way to make sure we stop cleanly rather than going
     443              :             // into infinite retry loops, and will make it clear to the outside
     444              :             // world that we need help.
     445            0 :             true
     446              :         }
     447              :         _ => {
     448              :             // Treat all other local file I/O errors are retryable.  This includes:
     449              :             // - ENOSPC: we stay up and wait for eviction to free some space
     450              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     451              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     452            2 :             false
     453              :         }
     454              :     }
     455            2 : }
     456              : 
     457              : /// Call this when the local filesystem gives us an error with an external
     458              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     459              : /// bad storage or bad configuration, and we can't fix that from inside
     460              : /// a running process.
     461            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     462            0 :     tracing::error!("Fatal I/O error: {e}: {context})");
     463            0 :     std::process::abort();
     464              : }
     465              : 
     466              : pub(crate) trait MaybeFatalIo<T> {
     467              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     468              :     fn fatal_err(self, context: &str) -> T;
     469              : }
     470              : 
     471              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     472              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     473              :     ///
     474              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     475              :     /// not on ENOSPC.
     476      1140662 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     477      1140662 :         if let Err(e) = &self {
     478            2 :             if is_fatal_io_error(e) {
     479            0 :                 on_fatal_io_error(e, context);
     480            2 :             }
     481      1140660 :         }
     482      1140662 :         self
     483      1140662 :     }
     484              : 
     485              :     /// Terminate the process on any I/O error.
     486              :     ///
     487              :     /// This is appropriate for reads on files that we know exist: they should always work.
     488         2038 :     fn fatal_err(self, context: &str) -> T {
     489         2038 :         match self {
     490         2038 :             Ok(v) => v,
     491            0 :             Err(e) => {
     492            0 :                 on_fatal_io_error(&e, context);
     493              :             }
     494              :         }
     495         2038 :     }
     496              : }
     497              : 
     498              : /// Observe duration for the given storage I/O operation
     499              : ///
     500              : /// Unlike `observe_closure_duration`, this supports async,
     501              : /// where "support" means that we measure wall clock time.
     502              : macro_rules! observe_duration {
     503              :     ($op:expr, $($body:tt)*) => {{
     504              :         let instant = Instant::now();
     505              :         let result = $($body)*;
     506              :         let elapsed = instant.elapsed().as_secs_f64();
     507              :         STORAGE_IO_TIME_METRIC
     508              :             .get($op)
     509              :             .observe(elapsed);
     510              :         result
     511              :     }}
     512              : }
     513              : 
     514              : macro_rules! with_file {
     515              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     516              :         let $ident = $this.lock_file().await?;
     517              :         observe_duration!($op, $($body)*)
     518              :     }};
     519              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     520              :         let mut $ident = $this.lock_file().await?;
     521              :         observe_duration!($op, $($body)*)
     522              :     }};
     523              : }
     524              : 
     525              : impl VirtualFileInner {
     526              :     /// Open a file in read-only mode. Like File::open.
     527         1048 :     pub async fn open<P: AsRef<Utf8Path>>(
     528         1048 :         path: P,
     529         1048 :         ctx: &RequestContext,
     530         1048 :     ) -> Result<VirtualFileInner, std::io::Error> {
     531         1048 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
     532         1048 :     }
     533              : 
     534              :     /// Create a new file for writing. If the file exists, it will be truncated.
     535              :     /// Like File::create.
     536         1481 :     pub async fn create<P: AsRef<Utf8Path>>(
     537         1481 :         path: P,
     538         1481 :         ctx: &RequestContext,
     539         1481 :     ) -> Result<VirtualFileInner, std::io::Error> {
     540         1481 :         Self::open_with_options(
     541         1481 :             path.as_ref(),
     542         1481 :             OpenOptions::new().write(true).create(true).truncate(true),
     543         1481 :             ctx,
     544         1481 :         )
     545          763 :         .await
     546         1481 :     }
     547              : 
     548              :     /// Open a file with given options.
     549              :     ///
     550              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     551              :     /// they will be applied also when the file is subsequently re-opened, not only
     552              :     /// on the first time. Make sure that's sane!
     553         5913 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     554         5913 :         path: P,
     555         5913 :         open_options: &OpenOptions,
     556         5913 :         _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
     557         5913 :     ) -> Result<VirtualFileInner, std::io::Error> {
     558         5913 :         let path_ref = path.as_ref();
     559         5913 :         let path_str = path_ref.to_string();
     560         5913 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     561         5913 :         let (tenant_id, shard_id, timeline_id) =
     562         5913 :             if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     563         4419 :                 let tenant_shard_part = parts[parts.len() - 4];
     564         4419 :                 let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
     565         4419 :                     Ok(tenant_shard_id) => (
     566         4419 :                         tenant_shard_id.tenant_id.to_string(),
     567         4419 :                         format!("{}", tenant_shard_id.shard_slug()),
     568         4419 :                     ),
     569              :                     Err(_) => {
     570              :                         // Malformed path: this ID is just for observability, so tolerate it
     571              :                         // and pass through
     572            0 :                         (tenant_shard_part.to_string(), "*".to_string())
     573              :                     }
     574              :                 };
     575         4419 :                 (tenant_id, shard_id, parts[parts.len() - 2].to_string())
     576              :             } else {
     577         1494 :                 ("*".to_string(), "*".to_string(), "*".to_string())
     578              :             };
     579         5913 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     580              : 
     581              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     582              :         // where our caller doesn't get to use the returned VirtualFile before its
     583              :         // slot gets re-used by someone else.
     584         5913 :         let file = observe_duration!(StorageIoOperation::Open, {
     585         5913 :             open_options.open(path_ref.as_std_path()).await?
     586              :         });
     587              : 
     588              :         // Strip all options other than read and write.
     589              :         //
     590              :         // It would perhaps be nicer to check just for the read and write flags
     591              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     592              :         // only to set them.
     593         5913 :         let mut reopen_options = open_options.clone();
     594         5913 :         reopen_options.create(false);
     595         5913 :         reopen_options.create_new(false);
     596         5913 :         reopen_options.truncate(false);
     597         5913 : 
     598         5913 :         let vfile = VirtualFileInner {
     599         5913 :             handle: RwLock::new(handle),
     600         5913 :             pos: 0,
     601         5913 :             path: path_ref.to_path_buf(),
     602         5913 :             open_options: reopen_options,
     603         5913 :             tenant_id,
     604         5913 :             shard_id,
     605         5913 :             timeline_id,
     606         5913 :         };
     607         5913 : 
     608         5913 :         // TODO: Under pressure, it's likely the slot will get re-used and
     609         5913 :         // the underlying file closed before they get around to using it.
     610         5913 :         // => https://github.com/neondatabase/neon/issues/6065
     611         5913 :         slot_guard.file.replace(file);
     612         5913 : 
     613         5913 :         Ok(vfile)
     614         5913 :     }
     615              : 
     616              :     /// Async version of [`::utils::crashsafe::overwrite`].
     617              :     ///
     618              :     /// # NB:
     619              :     ///
     620              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     621              :     /// it did at an earlier time.
     622              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     623           28 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     624           28 :         final_path: Utf8PathBuf,
     625           28 :         tmp_path: Utf8PathBuf,
     626           28 :         content: B,
     627           28 :     ) -> std::io::Result<()> {
     628           28 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     629           28 :         // See https://github.com/neondatabase/neon/issues/6663
     630           28 : 
     631           28 :         tokio::task::spawn_blocking(move || {
     632           28 :             let slice_storage;
     633           28 :             let content_len = content.bytes_init();
     634           28 :             let content = if content.bytes_init() > 0 {
     635           28 :                 slice_storage = Some(content.slice(0..content_len));
     636           28 :                 slice_storage.as_deref().expect("just set it to Some()")
     637              :             } else {
     638            0 :                 &[]
     639              :             };
     640           28 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     641           28 :                 .maybe_fatal_err("crashsafe_overwrite")
     642           28 :         })
     643           28 :         .await
     644           28 :         .expect("blocking task is never aborted")
     645           28 :     }
     646              : 
     647              :     /// Call File::sync_all() on the underlying File.
     648         2733 :     pub async fn sync_all(&self) -> Result<(), Error> {
     649         2733 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     650         2733 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     651         2733 :             res.maybe_fatal_err("sync_all")
     652              :         })
     653         2733 :     }
     654              : 
     655              :     /// Call File::sync_data() on the underlying File.
     656            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     657            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     658            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     659            0 :             res.maybe_fatal_err("sync_data")
     660              :         })
     661            0 :     }
     662              : 
     663         1730 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     664         1730 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     665         1730 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     666         1730 :             res
     667              :         })
     668         1730 :     }
     669              : 
     670              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     671              :     /// opens it and evicts some other File if necessary. The passed parameter is
     672              :     /// assumed to be a function available for the physical `File`.
     673              :     ///
     674              :     /// We are doing it via a macro as Rust doesn't support async closures that
     675              :     /// take on parameters with lifetimes.
     676      1906616 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     677      1906616 :         let open_files = get_open_files();
     678              : 
     679       187404 :         let mut handle_guard = {
     680              :             // Read the cached slot handle, and see if the slot that it points to still
     681              :             // contains our File.
     682              :             //
     683              :             // We only need to hold the handle lock while we read the current handle. If
     684              :             // another thread closes the file and recycles the slot for a different file,
     685              :             // we will notice that the handle we read is no longer valid and retry.
     686      1906616 :             let mut handle = *self.handle.read().await;
     687              :             loop {
     688              :                 // Check if the slot contains our File
     689              :                 {
     690      1998663 :                     let slot = &open_files.slots[handle.index];
     691      1998663 :                     let slot_guard = slot.inner.read().await;
     692      1998663 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     693              :                         // Found a cached file descriptor.
     694      1719212 :                         slot.recently_used.store(true, Ordering::Relaxed);
     695      1719212 :                         return Ok(FileGuard { slot_guard });
     696       279451 :                     }
     697              :                 }
     698              : 
     699              :                 // The slot didn't contain our File. We will have to open it ourselves,
     700              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     701              :                 // that no other thread will try to concurrently open the same file.
     702       279451 :                 let handle_guard = self.handle.write().await;
     703              : 
     704              :                 // If another thread changed the handle while we were not holding the lock,
     705              :                 // then the handle might now be valid again. Loop back to retry.
     706       279451 :                 if *handle_guard != handle {
     707        92047 :                     handle = *handle_guard;
     708        92047 :                     continue;
     709       187404 :                 }
     710       187404 :                 break handle_guard;
     711              :             }
     712              :         };
     713              : 
     714              :         // We need to open the file ourselves. The handle in the VirtualFile is
     715              :         // now locked in write-mode. Find a free slot to put it in.
     716       187404 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     717              : 
     718              :         // Re-open the physical file.
     719              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     720              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     721              :         // of the virtual file descriptor cache.
     722       187404 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     723       187404 :             self.open_options.open(self.path.as_std_path()).await?
     724              :         });
     725              : 
     726              :         // Store the File in the slot and update the handle in the VirtualFile
     727              :         // to point to it.
     728       187404 :         slot_guard.file.replace(file);
     729       187404 : 
     730       187404 :         *handle_guard = handle;
     731       187404 : 
     732       187404 :         Ok(FileGuard {
     733       187404 :             slot_guard: slot_guard.downgrade(),
     734       187404 :         })
     735      1906616 :     }
     736              : 
     737          224 :     pub fn remove(self) {
     738          224 :         let path = self.path.clone();
     739          224 :         drop(self);
     740          224 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     741          224 :     }
     742              : 
     743         5426 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     744         5426 :         match pos {
     745         5416 :             SeekFrom::Start(offset) => {
     746         5416 :                 self.pos = offset;
     747         5416 :             }
     748            4 :             SeekFrom::End(offset) => {
     749            4 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     750            4 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     751              :             }
     752            6 :             SeekFrom::Current(offset) => {
     753            6 :                 let pos = self.pos as i128 + offset as i128;
     754            6 :                 if pos < 0 {
     755            2 :                     return Err(Error::new(
     756            2 :                         ErrorKind::InvalidInput,
     757            2 :                         "offset would be negative",
     758            2 :                     ));
     759            4 :                 }
     760            4 :                 if pos > u64::MAX as i128 {
     761            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     762            4 :                 }
     763            4 :                 self.pos = pos as u64;
     764              :             }
     765              :         }
     766         5422 :         Ok(self.pos)
     767         5426 :     }
     768              : 
     769              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     770              :     ///
     771              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     772       765563 :     pub async fn read_exact_at<Buf>(
     773       765563 :         &self,
     774       765563 :         slice: Slice<Buf>,
     775       765563 :         offset: u64,
     776       765563 :         ctx: &RequestContext,
     777       765563 :     ) -> Result<Slice<Buf>, Error>
     778       765563 :     where
     779       765563 :         Buf: IoBufAlignedMut + Send,
     780       765563 :     {
     781       765563 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     782       765563 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     783              :         } else {
     784            0 :             None
     785              :         };
     786              : 
     787       765563 :         let original_bounds = slice.bounds();
     788       765563 :         let (buf, res) =
     789       878141 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     790       765563 :         let res = res.map(|_| buf.slice(original_bounds));
     791              : 
     792       765563 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     793       765563 :             if let Ok(slice) = &res {
     794       765563 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     795       765563 :                 assert_eq!(original_bounds, returned_bounds);
     796            0 :             }
     797            0 :         }
     798              : 
     799       765563 :         res
     800       765563 :     }
     801              : 
     802              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     803        32049 :     pub async fn read_exact_at_page(
     804        32049 :         &self,
     805        32049 :         page: PageWriteGuard<'static>,
     806        32049 :         offset: u64,
     807        32049 :         ctx: &RequestContext,
     808        32049 :     ) -> Result<PageWriteGuard<'static>, Error> {
     809        32049 :         let buf = PageWriteGuardBuf { page }.slice_full();
     810        32049 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     811        32049 :         self.read_exact_at(buf, offset, ctx)
     812        21771 :             .await
     813        32049 :             .map(|slice| slice.into_inner().page)
     814        32049 :     }
     815              : 
     816              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     817            4 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     818            4 :         &self,
     819            4 :         buf: FullSlice<Buf>,
     820            4 :         mut offset: u64,
     821            4 :         ctx: &RequestContext,
     822            4 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     823            4 :         let buf = buf.into_raw_slice();
     824            4 :         let bounds = buf.bounds();
     825            4 :         let restore =
     826            4 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     827            4 :         let mut buf = buf;
     828            8 :         while !buf.is_empty() {
     829            4 :             let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
     830            4 :             buf = tmp.into_raw_slice();
     831            0 :             match res {
     832              :                 Ok(0) => {
     833            0 :                     return (
     834            0 :                         restore(buf),
     835            0 :                         Err(Error::new(
     836            0 :                             std::io::ErrorKind::WriteZero,
     837            0 :                             "failed to write whole buffer",
     838            0 :                         )),
     839            0 :                     );
     840              :                 }
     841            4 :                 Ok(n) => {
     842            4 :                     buf = buf.slice(n..);
     843            4 :                     offset += n as u64;
     844            4 :                 }
     845            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     846            0 :                 Err(e) => return (restore(buf), Err(e)),
     847              :             }
     848              :         }
     849            4 :         (restore(buf), Ok(()))
     850            4 :     }
     851              : 
     852              :     /// Writes `buf` to the file at the current offset.
     853              :     ///
     854              :     /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
     855      1136176 :     pub async fn write_all<Buf: IoBuf + Send>(
     856      1136176 :         &mut self,
     857      1136176 :         buf: FullSlice<Buf>,
     858      1136176 :         ctx: &RequestContext,
     859      1136176 :     ) -> (FullSlice<Buf>, Result<usize, Error>) {
     860      1136176 :         let buf = buf.into_raw_slice();
     861      1136176 :         let bounds = buf.bounds();
     862      1136176 :         let restore =
     863      1136176 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     864      1136176 :         let nbytes = buf.len();
     865      1136176 :         let mut buf = buf;
     866      2272312 :         while !buf.is_empty() {
     867      1136138 :             let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
     868      1136138 :             buf = tmp.into_raw_slice();
     869            2 :             match res {
     870              :                 Ok(0) => {
     871            0 :                     return (
     872            0 :                         restore(buf),
     873            0 :                         Err(Error::new(
     874            0 :                             std::io::ErrorKind::WriteZero,
     875            0 :                             "failed to write whole buffer",
     876            0 :                         )),
     877            0 :                     );
     878              :                 }
     879      1136136 :                 Ok(n) => {
     880      1136136 :                     buf = buf.slice(n..);
     881      1136136 :                 }
     882            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     883            2 :                 Err(e) => return (restore(buf), Err(e)),
     884              :             }
     885              :         }
     886      1136174 :         (restore(buf), Ok(nbytes))
     887      1136176 :     }
     888              : 
     889      1136138 :     async fn write<B: IoBuf + Send>(
     890      1136138 :         &mut self,
     891      1136138 :         buf: FullSlice<B>,
     892      1136138 :         ctx: &RequestContext,
     893      1136138 :     ) -> (FullSlice<B>, Result<usize, std::io::Error>) {
     894      1136138 :         let pos = self.pos;
     895      1136138 :         let (buf, res) = self.write_at(buf, pos, ctx).await;
     896      1136138 :         let n = match res {
     897      1136136 :             Ok(n) => n,
     898            2 :             Err(e) => return (buf, Err(e)),
     899              :         };
     900      1136136 :         self.pos += n as u64;
     901      1136136 :         (buf, Ok(n))
     902      1136138 :     }
     903              : 
     904       766007 :     pub(crate) async fn read_at<Buf>(
     905       766007 :         &self,
     906       766007 :         buf: tokio_epoll_uring::Slice<Buf>,
     907       766007 :         offset: u64,
     908       766007 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     909       766007 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     910       766007 :     where
     911       766007 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     912       766007 :     {
     913       766007 :         let file_guard = match self.lock_file().await {
     914       766007 :             Ok(file_guard) => file_guard,
     915            0 :             Err(e) => return (buf, Err(e)),
     916              :         };
     917              : 
     918       766007 :         observe_duration!(StorageIoOperation::Read, {
     919       766007 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     920       766007 :             if let Ok(size) = res {
     921       766005 :                 STORAGE_IO_SIZE
     922       766005 :                     .with_label_values(&[
     923       766005 :                         "read",
     924       766005 :                         &self.tenant_id,
     925       766005 :                         &self.shard_id,
     926       766005 :                         &self.timeline_id,
     927       766005 :                     ])
     928       766005 :                     .add(size as i64);
     929       766005 :             }
     930       766007 :             (buf, res)
     931              :         })
     932       766007 :     }
     933              : 
     934              :     /// The function aborts the process if the error is fatal.
     935      1136142 :     async fn write_at<B: IoBuf + Send>(
     936      1136142 :         &self,
     937      1136142 :         buf: FullSlice<B>,
     938      1136142 :         offset: u64,
     939      1136142 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     940      1136142 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     941      1136142 :         let (slice, result) = self.write_at_inner(buf, offset, _ctx).await;
     942      1136142 :         let result = result.maybe_fatal_err("write_at");
     943      1136142 :         (slice, result)
     944      1136142 :     }
     945              : 
     946      1136142 :     async fn write_at_inner<B: IoBuf + Send>(
     947      1136142 :         &self,
     948      1136142 :         buf: FullSlice<B>,
     949      1136142 :         offset: u64,
     950      1136142 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     951      1136142 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     952      1136142 :         let file_guard = match self.lock_file().await {
     953      1136142 :             Ok(file_guard) => file_guard,
     954            0 :             Err(e) => return (buf, Err(e)),
     955              :         };
     956      1136142 :         observe_duration!(StorageIoOperation::Write, {
     957      1136142 :             let ((_file_guard, buf), result) =
     958      1136142 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     959      1136142 :             if let Ok(size) = result {
     960      1136140 :                 STORAGE_IO_SIZE
     961      1136140 :                     .with_label_values(&[
     962      1136140 :                         "write",
     963      1136140 :                         &self.tenant_id,
     964      1136140 :                         &self.shard_id,
     965      1136140 :                         &self.timeline_id,
     966      1136140 :                     ])
     967      1136140 :                     .add(size as i64);
     968      1136140 :             }
     969      1136142 :             (buf, result)
     970              :         })
     971      1136142 :     }
     972              : }
     973              : 
     974              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     975       765571 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     976       765571 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     977       765571 :     mut offset: u64,
     978       765571 :     mut read_at: F,
     979       765571 : ) -> (Buf, std::io::Result<()>)
     980       765571 : where
     981       765571 :     Buf: IoBufMut + Send,
     982       765571 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
     983       765571 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
     984       765571 : {
     985      1531144 :     while buf.bytes_total() != 0 {
     986              :         let res;
     987       878141 :         (buf, res) = read_at(buf, offset).await;
     988            0 :         match res {
     989            2 :             Ok(0) => break,
     990       765573 :             Ok(n) => {
     991       765573 :                 buf = buf.slice(n..);
     992       765573 :                 offset += n as u64;
     993       765573 :             }
     994            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     995            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     996              :         }
     997              :     }
     998              :     // NB: don't use `buf.is_empty()` here; it is from the
     999              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
    1000              :     // returned by it only covers the initialized portion of `buf`.
    1001              :     // Whereas we're interested in ensuring that we filled the entire
    1002              :     // buffer that the user passed in.
    1003       765571 :     if buf.bytes_total() != 0 {
    1004            2 :         (
    1005            2 :             buf.into_inner(),
    1006            2 :             Err(std::io::Error::new(
    1007            2 :                 std::io::ErrorKind::UnexpectedEof,
    1008            2 :                 "failed to fill whole buffer",
    1009            2 :             )),
    1010            2 :         )
    1011              :     } else {
    1012       765569 :         assert_eq!(buf.len(), buf.bytes_total());
    1013       765569 :         (buf.into_inner(), Ok(()))
    1014              :     }
    1015       765571 : }
    1016              : 
    1017              : #[cfg(test)]
    1018              : mod test_read_exact_at_impl {
    1019              : 
    1020              :     use std::{collections::VecDeque, sync::Arc};
    1021              : 
    1022              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
    1023              : 
    1024              :     use super::read_exact_at_impl;
    1025              : 
    1026              :     struct Expectation {
    1027              :         offset: u64,
    1028              :         bytes_total: usize,
    1029              :         result: std::io::Result<Vec<u8>>,
    1030              :     }
    1031              :     struct MockReadAt {
    1032              :         expectations: VecDeque<Expectation>,
    1033              :     }
    1034              : 
    1035              :     impl MockReadAt {
    1036           12 :         async fn read_at(
    1037           12 :             &mut self,
    1038           12 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
    1039           12 :             offset: u64,
    1040           12 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
    1041           12 :             let exp = self
    1042           12 :                 .expectations
    1043           12 :                 .pop_front()
    1044           12 :                 .expect("read_at called but we have no expectations left");
    1045           12 :             assert_eq!(exp.offset, offset);
    1046           12 :             assert_eq!(exp.bytes_total, buf.bytes_total());
    1047           12 :             match exp.result {
    1048           12 :                 Ok(bytes) => {
    1049           12 :                     assert!(bytes.len() <= buf.bytes_total());
    1050           12 :                     buf.put_slice(&bytes);
    1051           12 :                     (buf, Ok(bytes.len()))
    1052              :                 }
    1053            0 :                 Err(e) => (buf, Err(e)),
    1054              :             }
    1055           12 :         }
    1056              :     }
    1057              : 
    1058              :     impl Drop for MockReadAt {
    1059            8 :         fn drop(&mut self) {
    1060            8 :             assert_eq!(self.expectations.len(), 0);
    1061            8 :         }
    1062              :     }
    1063              : 
    1064              :     #[tokio::test]
    1065            2 :     async fn test_basic() {
    1066            2 :         let buf = Vec::with_capacity(5).slice_full();
    1067            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1068            2 :             expectations: VecDeque::from(vec![Expectation {
    1069            2 :                 offset: 0,
    1070            2 :                 bytes_total: 5,
    1071            2 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
    1072            2 :             }]),
    1073            2 :         }));
    1074            2 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1075            2 :             let mock_read_at = Arc::clone(&mock_read_at);
    1076            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1077            2 :         })
    1078            2 :         .await;
    1079            2 :         assert!(res.is_ok());
    1080            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
    1081            2 :     }
    1082              : 
    1083              :     #[tokio::test]
    1084            2 :     async fn test_empty_buf_issues_no_syscall() {
    1085            2 :         let buf = Vec::new().slice_full();
    1086            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1087            2 :             expectations: VecDeque::new(),
    1088            2 :         }));
    1089            2 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1090            0 :             let mock_read_at = Arc::clone(&mock_read_at);
    1091            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1092            2 :         })
    1093            2 :         .await;
    1094            2 :         assert!(res.is_ok());
    1095            2 :     }
    1096              : 
    1097              :     #[tokio::test]
    1098            2 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
    1099            2 :         let buf = Vec::with_capacity(4).slice_full();
    1100            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1101            2 :             expectations: VecDeque::from(vec![
    1102            2 :                 Expectation {
    1103            2 :                     offset: 0,
    1104            2 :                     bytes_total: 4,
    1105            2 :                     result: Ok(vec![b'a', b'b']),
    1106            2 :                 },
    1107            2 :                 Expectation {
    1108            2 :                     offset: 2,
    1109            2 :                     bytes_total: 2,
    1110            2 :                     result: Ok(vec![b'c', b'd']),
    1111            2 :                 },
    1112            2 :             ]),
    1113            2 :         }));
    1114            4 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1115            4 :             let mock_read_at = Arc::clone(&mock_read_at);
    1116            4 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1117            4 :         })
    1118            2 :         .await;
    1119            2 :         assert!(res.is_ok());
    1120            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
    1121            2 :     }
    1122              : 
    1123              :     #[tokio::test]
    1124            2 :     async fn test_eof_before_buffer_full() {
    1125            2 :         let buf = Vec::with_capacity(3).slice_full();
    1126            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
    1127            2 :             expectations: VecDeque::from(vec![
    1128            2 :                 Expectation {
    1129            2 :                     offset: 0,
    1130            2 :                     bytes_total: 3,
    1131            2 :                     result: Ok(vec![b'a']),
    1132            2 :                 },
    1133            2 :                 Expectation {
    1134            2 :                     offset: 1,
    1135            2 :                     bytes_total: 2,
    1136            2 :                     result: Ok(vec![b'b']),
    1137            2 :                 },
    1138            2 :                 Expectation {
    1139            2 :                     offset: 2,
    1140            2 :                     bytes_total: 1,
    1141            2 :                     result: Ok(vec![]),
    1142            2 :                 },
    1143            2 :             ]),
    1144            2 :         }));
    1145            6 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
    1146            6 :             let mock_read_at = Arc::clone(&mock_read_at);
    1147            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
    1148            6 :         })
    1149            2 :         .await;
    1150            2 :         let Err(err) = res else {
    1151            2 :             panic!("should return an error");
    1152            2 :         };
    1153            2 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
    1154            2 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
    1155            2 :         // buffer contents on error are unspecified
    1156            2 :     }
    1157              : }
    1158              : 
    1159              : struct FileGuard {
    1160              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
    1161              : }
    1162              : 
    1163              : impl AsRef<OwnedFd> for FileGuard {
    1164      1906616 :     fn as_ref(&self) -> &OwnedFd {
    1165      1906616 :         // This unwrap is safe because we only create `FileGuard`s
    1166      1906616 :         // if we know that the file is Some.
    1167      1906616 :         self.slot_guard.file.as_ref().unwrap()
    1168      1906616 :     }
    1169              : }
    1170              : 
    1171              : impl FileGuard {
    1172              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1173       953365 :     fn with_std_file<F, R>(&self, with: F) -> R
    1174       953365 :     where
    1175       953365 :         F: FnOnce(&File) -> R,
    1176       953365 :     {
    1177       953365 :         // SAFETY:
    1178       953365 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1179       953365 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
    1180       953365 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1181       953365 :         let res = with(&file);
    1182       953365 :         let _ = file.into_raw_fd();
    1183       953365 :         res
    1184       953365 :     }
    1185              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1186            4 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
    1187            4 :     where
    1188            4 :         F: FnOnce(&mut File) -> R,
    1189            4 :     {
    1190            4 :         // SAFETY:
    1191            4 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1192            4 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
    1193            4 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1194            4 :         let res = with(&mut file);
    1195            4 :         let _ = file.into_raw_fd();
    1196            4 :         res
    1197            4 :     }
    1198              : }
    1199              : 
    1200              : impl tokio_epoll_uring::IoFd for FileGuard {
    1201       953247 :     unsafe fn as_fd(&self) -> RawFd {
    1202       953247 :         let owned_fd: &OwnedFd = self.as_ref();
    1203       953247 :         owned_fd.as_raw_fd()
    1204       953247 :     }
    1205              : }
    1206              : 
    1207              : #[cfg(test)]
    1208              : impl VirtualFile {
    1209        20916 :     pub(crate) async fn read_blk(
    1210        20916 :         &self,
    1211        20916 :         blknum: u32,
    1212        20916 :         ctx: &RequestContext,
    1213        20916 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1214        20916 :         self.inner.read_blk(blknum, ctx).await
    1215        20916 :     }
    1216              : 
    1217          224 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
    1218          226 :         self.inner.read_to_end(buf, ctx).await
    1219          224 :     }
    1220              : }
    1221              : 
    1222              : #[cfg(test)]
    1223              : impl VirtualFileInner {
    1224        20916 :     pub(crate) async fn read_blk(
    1225        20916 :         &self,
    1226        20916 :         blknum: u32,
    1227        20916 :         ctx: &RequestContext,
    1228        20916 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1229              :         use crate::page_cache::PAGE_SZ;
    1230        20916 :         let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
    1231        20916 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1232        20916 :         let slice = self
    1233        20916 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1234        10619 :             .await?;
    1235        20916 :         Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
    1236        20916 :             slice.into_inner(),
    1237        20916 :         ))
    1238        20916 :     }
    1239              : 
    1240          224 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
    1241          224 :         let mut tmp = vec![0; 128];
    1242              :         loop {
    1243          444 :             let slice = tmp.slice(..128);
    1244          444 :             let (slice, res) = self.read_at(slice, self.pos, ctx).await;
    1245            2 :             match res {
    1246          222 :                 Ok(0) => return Ok(()),
    1247          220 :                 Ok(n) => {
    1248          220 :                     self.pos += n as u64;
    1249          220 :                     buf.extend_from_slice(&slice[..n]);
    1250          220 :                 }
    1251            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1252            2 :                 Err(e) => return Err(e),
    1253              :             }
    1254          220 :             tmp = slice.into_inner();
    1255              :         }
    1256          224 :     }
    1257              : }
    1258              : 
    1259              : impl Drop for VirtualFileInner {
    1260              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1261         5110 :     fn drop(&mut self) {
    1262         5110 :         let handle = self.handle.get_mut();
    1263              : 
    1264         5110 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1265         5110 :             if slot_guard.tag == tag {
    1266         4550 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1267              :                 // there is also operation "close-by-replace" for closes done on eviction for
    1268              :                 // comparison.
    1269         4550 :                 if let Some(fd) = slot_guard.file.take() {
    1270         4550 :                     STORAGE_IO_TIME_METRIC
    1271         4550 :                         .get(StorageIoOperation::Close)
    1272         4550 :                         .observe_closure_duration(|| drop(fd));
    1273         4550 :                 }
    1274          560 :             }
    1275         5110 :         }
    1276              : 
    1277              :         // We don't have async drop so we cannot directly await the lock here.
    1278              :         // Instead, first do a best-effort attempt at closing the underlying
    1279              :         // file descriptor by using `try_write`, and if that fails, spawn
    1280              :         // a tokio task to do it asynchronously: we just want it to be
    1281              :         // cleaned up eventually.
    1282              :         // Most of the time, the `try_lock` should succeed though,
    1283              :         // as we have `&mut self` access. In other words, if the slot
    1284              :         // is still occupied by our file, there should be no access from
    1285              :         // other I/O operations; the only other possible place to lock
    1286              :         // the slot is the lock algorithm looking for free slots.
    1287         5110 :         let slot = &get_open_files().slots[handle.index];
    1288         5110 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1289         5110 :             clean_slot(slot, slot_guard, handle.tag);
    1290         5110 :         } else {
    1291            0 :             let tag = handle.tag;
    1292            0 :             tokio::spawn(async move {
    1293            0 :                 let slot_guard = slot.inner.write().await;
    1294            0 :                 clean_slot(slot, slot_guard, tag);
    1295            0 :             });
    1296            0 :         };
    1297         5110 :     }
    1298              : }
    1299              : 
    1300              : impl OwnedAsyncWriter for VirtualFile {
    1301              :     #[inline(always)]
    1302         6597 :     async fn write_all<Buf: IoBuf + Send>(
    1303         6597 :         &mut self,
    1304         6597 :         buf: FullSlice<Buf>,
    1305         6597 :         ctx: &RequestContext,
    1306         6597 :     ) -> std::io::Result<(usize, FullSlice<Buf>)> {
    1307         6597 :         let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
    1308         6597 :         res.map(move |v| (v, buf))
    1309         6597 :     }
    1310              : }
    1311              : 
    1312              : impl OpenFiles {
    1313          200 :     fn new(num_slots: usize) -> OpenFiles {
    1314          200 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1315         2000 :         for _ in 0..num_slots {
    1316         2000 :             let slot = Slot {
    1317         2000 :                 recently_used: AtomicBool::new(false),
    1318         2000 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1319         2000 :             };
    1320         2000 :             slots.push(slot);
    1321         2000 :         }
    1322              : 
    1323          200 :         OpenFiles {
    1324          200 :             next: AtomicUsize::new(0),
    1325          200 :             slots: Box::leak(slots),
    1326          200 :         }
    1327          200 :     }
    1328              : }
    1329              : 
    1330              : ///
    1331              : /// Initialize the virtual file module. This must be called once at page
    1332              : /// server startup.
    1333              : ///
    1334              : #[cfg(not(test))]
    1335            0 : pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode) {
    1336            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1337            0 :         panic!("virtual_file::init called twice");
    1338            0 :     }
    1339            0 :     set_io_mode(mode);
    1340            0 :     io_engine::init(engine);
    1341            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1342            0 : }
    1343              : 
    1344              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1345              : 
    1346              : // Get a handle to the global slots array.
    1347      1917639 : fn get_open_files() -> &'static OpenFiles {
    1348      1917639 :     //
    1349      1917639 :     // In unit tests, page server startup doesn't happen and no one calls
    1350      1917639 :     // virtual_file::init(). Initialize it here, with a small array.
    1351      1917639 :     //
    1352      1917639 :     // This applies to the virtual file tests below, but all other unit
    1353      1917639 :     // tests too, so the virtual file facility is always usable in
    1354      1917639 :     // unit tests.
    1355      1917639 :     //
    1356      1917639 :     if cfg!(test) {
    1357      1917639 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1358              :     } else {
    1359            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1360              :     }
    1361      1917639 : }
    1362              : 
    1363              : /// Gets the io buffer alignment.
    1364            0 : pub(crate) const fn get_io_buffer_alignment() -> usize {
    1365            0 :     DEFAULT_IO_BUFFER_ALIGNMENT
    1366            0 : }
    1367              : 
    1368              : pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
    1369              : pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
    1370              : pub(crate) type IoPageSlice<'a> =
    1371              :     AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
    1372              : 
    1373              : static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
    1374              : 
    1375            0 : pub(crate) fn set_io_mode(mode: IoMode) {
    1376            0 :     IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
    1377            0 : }
    1378              : 
    1379         1190 : pub(crate) fn get_io_mode() -> IoMode {
    1380         1190 :     IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
    1381         1190 : }
    1382              : #[cfg(test)]
    1383              : mod tests {
    1384              :     use crate::context::DownloadBehavior;
    1385              :     use crate::task_mgr::TaskKind;
    1386              : 
    1387              :     use super::*;
    1388              :     use owned_buffers_io::io_buf_ext::IoBufExt;
    1389              :     use owned_buffers_io::slice::SliceMutExt;
    1390              :     use rand::seq::SliceRandom;
    1391              :     use rand::thread_rng;
    1392              :     use rand::Rng;
    1393              :     use std::io::Write;
    1394              :     use std::os::unix::fs::FileExt;
    1395              :     use std::sync::Arc;
    1396              : 
    1397              :     enum MaybeVirtualFile {
    1398              :         VirtualFile(VirtualFile),
    1399              :         File(File),
    1400              :     }
    1401              : 
    1402              :     impl From<VirtualFile> for MaybeVirtualFile {
    1403            6 :         fn from(vf: VirtualFile) -> Self {
    1404            6 :             MaybeVirtualFile::VirtualFile(vf)
    1405            6 :         }
    1406              :     }
    1407              : 
    1408              :     impl MaybeVirtualFile {
    1409          404 :         async fn read_exact_at(
    1410          404 :             &self,
    1411          404 :             mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
    1412          404 :             offset: u64,
    1413          404 :             ctx: &RequestContext,
    1414          404 :         ) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
    1415          404 :             match self {
    1416          203 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
    1417          202 :                 MaybeVirtualFile::File(file) => {
    1418          202 :                     let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
    1419          202 :                     file.read_exact_at(rust_slice, offset).map(|()| slice)
    1420              :                 }
    1421              :             }
    1422          404 :         }
    1423            8 :         async fn write_all_at<Buf: IoBuf + Send>(
    1424            8 :             &self,
    1425            8 :             buf: FullSlice<Buf>,
    1426            8 :             offset: u64,
    1427            8 :             ctx: &RequestContext,
    1428            8 :         ) -> Result<(), Error> {
    1429            8 :             match self {
    1430            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1431            4 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1432            4 :                     res
    1433              :                 }
    1434            4 :                 MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
    1435              :             }
    1436            8 :         }
    1437           36 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1438           36 :             match self {
    1439           18 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1440           18 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1441              :             }
    1442           36 :         }
    1443            8 :         async fn write_all<Buf: IoBuf + Send>(
    1444            8 :             &mut self,
    1445            8 :             buf: FullSlice<Buf>,
    1446            8 :             ctx: &RequestContext,
    1447            8 :         ) -> Result<(), Error> {
    1448            8 :             match self {
    1449            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1450            4 :                     let (_buf, res) = file.write_all(buf, ctx).await;
    1451            4 :                     res.map(|_| ())
    1452              :                 }
    1453            4 :                 MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
    1454              :             }
    1455            8 :         }
    1456              : 
    1457              :         // Helper function to slurp contents of a file, starting at the current position,
    1458              :         // into a string
    1459          442 :         async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
    1460              :             use std::io::Read;
    1461          442 :             let mut buf = String::new();
    1462          442 :             match self {
    1463          224 :                 MaybeVirtualFile::VirtualFile(file) => {
    1464          224 :                     let mut buf = Vec::new();
    1465          226 :                     file.read_to_end(&mut buf, ctx).await?;
    1466          222 :                     return Ok(String::from_utf8(buf).unwrap());
    1467              :                 }
    1468          218 :                 MaybeVirtualFile::File(file) => {
    1469          218 :                     file.read_to_string(&mut buf)?;
    1470              :                 }
    1471              :             }
    1472          216 :             Ok(buf)
    1473          442 :         }
    1474              : 
    1475              :         // Helper function to slurp a portion of a file into a string
    1476          404 :         async fn read_string_at(
    1477          404 :             &mut self,
    1478          404 :             pos: u64,
    1479          404 :             len: usize,
    1480          404 :             ctx: &RequestContext,
    1481          404 :         ) -> Result<String, Error> {
    1482          404 :             let slice = IoBufferMut::with_capacity(len).slice_full();
    1483          404 :             assert_eq!(slice.bytes_total(), len);
    1484          404 :             let slice = self.read_exact_at(slice, pos, ctx).await?;
    1485          404 :             let buf = slice.into_inner();
    1486          404 :             assert_eq!(buf.len(), len);
    1487              : 
    1488          404 :             Ok(String::from_utf8(buf.to_vec()).unwrap())
    1489          404 :         }
    1490              :     }
    1491              : 
    1492              :     #[tokio::test]
    1493            2 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1494            2 :         // The real work is done in the test_files() helper function. This
    1495            2 :         // allows us to run the same set of tests against a native File, and
    1496            2 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1497            2 :         // but this allows us to verify that the operations return the same
    1498            2 :         // results with VirtualFiles as with native Files. (Except that with
    1499            2 :         // native files, you will run out of file descriptors if the ulimit
    1500            2 :         // is low enough.)
    1501            2 :         struct A;
    1502            2 : 
    1503            2 :         impl Adapter for A {
    1504          206 :             async fn open(
    1505          206 :                 path: Utf8PathBuf,
    1506          206 :                 opts: OpenOptions,
    1507          206 :                 ctx: &RequestContext,
    1508          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1509          206 :                 let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
    1510          206 :                 Ok(MaybeVirtualFile::VirtualFile(vf))
    1511          206 :             }
    1512            2 :         }
    1513          531 :         test_files::<A>("virtual_files").await
    1514            2 :     }
    1515              : 
    1516              :     #[tokio::test]
    1517            2 :     async fn test_physical_files() -> anyhow::Result<()> {
    1518            2 :         struct B;
    1519            2 : 
    1520            2 :         impl Adapter for B {
    1521          206 :             async fn open(
    1522          206 :                 path: Utf8PathBuf,
    1523          206 :                 opts: OpenOptions,
    1524          206 :                 _ctx: &RequestContext,
    1525          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1526            2 :                 Ok(MaybeVirtualFile::File({
    1527          206 :                     let owned_fd = opts.open(path.as_std_path()).await?;
    1528          206 :                     File::from(owned_fd)
    1529            2 :                 }))
    1530          206 :             }
    1531            2 :         }
    1532            2 : 
    1533          104 :         test_files::<B>("physical_files").await
    1534            2 :     }
    1535              : 
    1536              :     /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
    1537              :     /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
    1538              :     /// in trait which benefits from the new lifetime capture rules already.
    1539              :     trait Adapter {
    1540              :         async fn open(
    1541              :             path: Utf8PathBuf,
    1542              :             opts: OpenOptions,
    1543              :             ctx: &RequestContext,
    1544              :         ) -> Result<MaybeVirtualFile, anyhow::Error>;
    1545              :     }
    1546              : 
    1547            4 :     async fn test_files<A>(testname: &str) -> anyhow::Result<()>
    1548            4 :     where
    1549            4 :         A: Adapter,
    1550            4 :     {
    1551            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1552            4 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1553            4 :         std::fs::create_dir_all(&testdir)?;
    1554              : 
    1555            4 :         let path_a = testdir.join("file_a");
    1556            4 :         let mut file_a = A::open(
    1557            4 :             path_a.clone(),
    1558            4 :             OpenOptions::new()
    1559            4 :                 .write(true)
    1560            4 :                 .create(true)
    1561            4 :                 .truncate(true)
    1562            4 :                 .to_owned(),
    1563            4 :             &ctx,
    1564            4 :         )
    1565            4 :         .await?;
    1566            4 :         file_a
    1567            4 :             .write_all(b"foobar".to_vec().slice_len(), &ctx)
    1568            1 :             .await?;
    1569              : 
    1570              :         // cannot read from a file opened in write-only mode
    1571            4 :         let _ = file_a.read_string(&ctx).await.unwrap_err();
    1572              : 
    1573              :         // Close the file and re-open for reading
    1574            4 :         let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
    1575              : 
    1576              :         // cannot write to a file opened in read-only mode
    1577            4 :         let _ = file_a
    1578            4 :             .write_all(b"bar".to_vec().slice_len(), &ctx)
    1579            1 :             .await
    1580            4 :             .unwrap_err();
    1581            4 : 
    1582            4 :         // Try simple read
    1583            4 :         assert_eq!("foobar", file_a.read_string(&ctx).await?);
    1584              : 
    1585              :         // It's positioned at the EOF now.
    1586            4 :         assert_eq!("", file_a.read_string(&ctx).await?);
    1587              : 
    1588              :         // Test seeks.
    1589            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1590            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1591              : 
    1592            4 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1593            4 :         assert_eq!("ar", file_a.read_string(&ctx).await?);
    1594              : 
    1595            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1596            4 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1597            4 :         assert_eq!("bar", file_a.read_string(&ctx).await?);
    1598              : 
    1599            4 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1600            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1601              : 
    1602              :         // Test erroneous seeks to before byte 0
    1603            4 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1604            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1605            4 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1606            4 : 
    1607            4 :         // the erroneous seek should have left the position unchanged
    1608            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1609              : 
    1610              :         // Create another test file, and try FileExt functions on it.
    1611            4 :         let path_b = testdir.join("file_b");
    1612            4 :         let mut file_b = A::open(
    1613            4 :             path_b.clone(),
    1614            4 :             OpenOptions::new()
    1615            4 :                 .read(true)
    1616            4 :                 .write(true)
    1617            4 :                 .create(true)
    1618            4 :                 .truncate(true)
    1619            4 :                 .to_owned(),
    1620            4 :             &ctx,
    1621            4 :         )
    1622            2 :         .await?;
    1623            4 :         file_b
    1624            4 :             .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
    1625            1 :             .await?;
    1626            4 :         file_b
    1627            4 :             .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
    1628            1 :             .await?;
    1629              : 
    1630            4 :         assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
    1631              : 
    1632              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1633              :         // open the same file many times. The effect is the same.)
    1634              :         //
    1635              :         // leave file_a positioned at offset 1 before we start
    1636            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1637              : 
    1638            4 :         let mut vfiles = Vec::new();
    1639          404 :         for _ in 0..100 {
    1640          400 :             let mut vfile = A::open(
    1641          400 :                 path_b.clone(),
    1642          400 :                 OpenOptions::new().read(true).to_owned(),
    1643          400 :                 &ctx,
    1644          400 :             )
    1645          200 :             .await?;
    1646          400 :             assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
    1647          400 :             vfiles.push(vfile);
    1648              :         }
    1649              : 
    1650              :         // make sure we opened enough files to definitely cause evictions.
    1651            4 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1652              : 
    1653              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1654              :         // from it again. We left the file positioned at offset 1 above.
    1655            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1656              : 
    1657              :         // Check that all the other FDs still work too. Use them in random order for
    1658              :         // good measure.
    1659            4 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1660          400 :         for vfile in vfiles.iter_mut() {
    1661          400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
    1662              :         }
    1663              : 
    1664            4 :         Ok(())
    1665            4 :     }
    1666              : 
    1667              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1668              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1669              :     /// VirtualFile from multiple threads concurrently.
    1670              :     #[tokio::test]
    1671            2 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1672            2 :         const SIZE: usize = 8 * 1024;
    1673            2 :         const VIRTUAL_FILES: usize = 100;
    1674            2 :         const THREADS: usize = 100;
    1675            2 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1676            2 : 
    1677            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1678            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1679            2 :         std::fs::create_dir_all(&testdir)?;
    1680            2 : 
    1681            2 :         // Create a test file.
    1682            2 :         let test_file_path = testdir.join("concurrency_test_file");
    1683            2 :         {
    1684            2 :             let file = File::create(&test_file_path)?;
    1685            2 :             file.write_all_at(&SAMPLE, 0)?;
    1686            2 :         }
    1687            2 : 
    1688            2 :         // Open the file many times.
    1689            2 :         let mut files = Vec::new();
    1690          202 :         for _ in 0..VIRTUAL_FILES {
    1691          200 :             let f = VirtualFileInner::open_with_options(
    1692          200 :                 &test_file_path,
    1693          200 :                 OpenOptions::new().read(true),
    1694          200 :                 &ctx,
    1695          200 :             )
    1696          101 :             .await?;
    1697          200 :             files.push(f);
    1698            2 :         }
    1699            2 :         let files = Arc::new(files);
    1700            2 : 
    1701            2 :         // Launch many threads, and use the virtual files concurrently in random order.
    1702            2 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1703            2 :             .worker_threads(THREADS)
    1704            2 :             .thread_name("test_vfile_concurrency thread")
    1705            2 :             .build()
    1706            2 :             .unwrap();
    1707            2 :         let mut hdls = Vec::new();
    1708          202 :         for _threadno in 0..THREADS {
    1709          200 :             let files = files.clone();
    1710          200 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1711          200 :             let hdl = rt.spawn(async move {
    1712          200 :                 let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
    1713          200 :                 let mut rng = rand::rngs::OsRng;
    1714       200000 :                 for _ in 1..1000 {
    1715       199800 :                     let f = &files[rng.gen_range(0..files.len())];
    1716       199800 :                     buf = f
    1717       199800 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1718       585061 :                         .await
    1719       199800 :                         .unwrap()
    1720       199800 :                         .into_inner();
    1721       199800 :                     assert!(buf[..] == SAMPLE);
    1722            2 :                 }
    1723          200 :             });
    1724          200 :             hdls.push(hdl);
    1725          200 :         }
    1726          202 :         for hdl in hdls {
    1727          200 :             hdl.await?;
    1728            2 :         }
    1729            2 :         std::mem::forget(rt);
    1730            2 : 
    1731            2 :         Ok(())
    1732            2 :     }
    1733              : 
    1734              :     #[tokio::test]
    1735            2 :     async fn test_atomic_overwrite_basic() {
    1736            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1737            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1738            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1739            2 : 
    1740            2 :         let path = testdir.join("myfile");
    1741            2 :         let tmp_path = testdir.join("myfile.tmp");
    1742            2 : 
    1743            2 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1744            2 :             .await
    1745            2 :             .unwrap();
    1746            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1747            2 :         let post = file.read_string(&ctx).await.unwrap();
    1748            2 :         assert_eq!(post, "foo");
    1749            2 :         assert!(!tmp_path.exists());
    1750            2 :         drop(file);
    1751            2 : 
    1752            2 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1753            2 :             .await
    1754            2 :             .unwrap();
    1755            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1756            2 :         let post = file.read_string(&ctx).await.unwrap();
    1757            2 :         assert_eq!(post, "bar");
    1758            2 :         assert!(!tmp_path.exists());
    1759            2 :         drop(file);
    1760            2 :     }
    1761              : 
    1762              :     #[tokio::test]
    1763            2 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1764            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1765            2 :         let testdir =
    1766            2 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1767            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1768            2 : 
    1769            2 :         let path = testdir.join("myfile");
    1770            2 :         let tmp_path = testdir.join("myfile.tmp");
    1771            2 : 
    1772            2 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1773            2 :         assert!(tmp_path.exists());
    1774            2 : 
    1775            2 :         VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1776            2 :             .await
    1777            2 :             .unwrap();
    1778            2 : 
    1779            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1780            2 :         let post = file.read_string(&ctx).await.unwrap();
    1781            2 :         assert_eq!(post, "foo");
    1782            2 :         assert!(!tmp_path.exists());
    1783            2 :         drop(file);
    1784            2 :     }
    1785              : }
        

Generated by: LCOV version 2.1-beta