LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 93.6 % 1020 955
Test Date: 2024-08-29 11:33:10 Functions: 91.7 % 204 187

            Line data    Source code
       1              : //!
       2              : //! VirtualFile is like a normal File, but it's not bound directly to
       3              : //! a file descriptor. Instead, the file is opened when it's read from,
       4              : //! and if too many files are open globally in the system, least-recently
       5              : //! used ones are closed.
       6              : //!
       7              : //! To track which files have been recently used, we use the clock algorithm
       8              : //! with a 'recently_used' flag on each slot.
       9              : //!
      10              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11              : //! src/backend/storage/file/fd.c
      12              : //!
      13              : use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
      14              : use crate::context::RequestContext;
      15              : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      16              : 
      17              : use crate::page_cache::{PageWriteGuard, PAGE_SZ};
      18              : use crate::tenant::TENANTS_SEGMENT_NAME;
      19              : use camino::{Utf8Path, Utf8PathBuf};
      20              : use once_cell::sync::OnceCell;
      21              : use owned_buffers_io::io_buf_ext::FullSlice;
      22              : use pageserver_api::shard::TenantShardId;
      23              : use std::fs::File;
      24              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      25              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      26              : 
      27              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      28              : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      29              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      30              : use tokio::time::Instant;
      31              : 
      32              : pub use pageserver_api::models::virtual_file as api;
      33              : pub(crate) mod io_engine;
      34              : pub use io_engine::feature_test as io_engine_feature_test;
      35              : pub use io_engine::io_engine_for_bench;
      36              : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
      37              : mod metadata;
      38              : mod open_options;
      39              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      40              : pub(crate) use api::DirectIoMode;
      41              : pub(crate) use io_engine::IoEngineKind;
      42              : pub(crate) use metadata::Metadata;
      43              : pub(crate) use open_options::*;
      44              : 
      45              : pub(crate) mod owned_buffers_io {
      46              :     //! Abstractions for IO with owned buffers.
      47              :     //!
      48              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      49              :     //! reason we need this abstraction.
      50              :     //!
      51              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      52              :     //! but for the time being we're proving out the primitives in the neon.git repo
      53              :     //! for faster iteration.
      54              : 
      55              :     pub(crate) mod io_buf_ext;
      56              :     pub(crate) mod slice;
      57              :     pub(crate) mod write;
      58              :     pub(crate) mod util {
      59              :         pub(crate) mod size_tracking_writer;
      60              :     }
      61              : }
      62              : 
      63              : ///
      64              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      65              : /// the underlying file is closed if the system is low on file descriptors,
      66              : /// and re-opened when it's accessed again.
      67              : ///
      68              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      69              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      70              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      71              : /// require a mutable reference, because they modify the "current position".
      72              : ///
      73              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      74              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      75              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      76              : /// 'tag' field is used to detect whether the handle still is valid or not.
      77              : ///
      78              : #[derive(Debug)]
      79              : pub struct VirtualFile {
      80              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      81              :     /// might contain our File, or it may be empty, or it may contain a File that
      82              :     /// belongs to a different VirtualFile.
      83              :     handle: RwLock<SlotHandle>,
      84              : 
      85              :     /// Current file position
      86              :     pos: u64,
      87              : 
      88              :     /// File path and options to use to open it.
      89              :     ///
      90              :     /// Note: this only contains the options needed to re-open it. For example,
      91              :     /// if a new file is created, we only pass the create flag when it's initially
      92              :     /// opened, in the VirtualFile::create() function, and strip the flag before
      93              :     /// storing it here.
      94              :     pub path: Utf8PathBuf,
      95              :     open_options: OpenOptions,
      96              : 
      97              :     // These are strings becase we only use them for metrics, and those expect strings.
      98              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      99              :     // strings.
     100              :     tenant_id: String,
     101              :     shard_id: String,
     102              :     timeline_id: String,
     103              : }
     104              : 
     105              : #[derive(Debug, PartialEq, Clone, Copy)]
     106              : struct SlotHandle {
     107              :     /// Index into OPEN_FILES.slots
     108              :     index: usize,
     109              : 
     110              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     111              :     /// been recycled and no longer contains the FD for this virtual file.
     112              :     tag: u64,
     113              : }
     114              : 
     115              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     116              : /// are currently open. Each slot in the array is protected by a separate lock,
     117              : /// so that different files can be accessed independently. The lock must be held
     118              : /// in write mode to replace the slot with a different file, but a read mode
     119              : /// is enough to operate on the file, whether you're reading or writing to it.
     120              : ///
     121              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     122              : /// the virtual_file::init() function. It must be called exactly once at page
     123              : /// server startup.
     124              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     125              : 
     126              : struct OpenFiles {
     127              :     slots: &'static [Slot],
     128              : 
     129              :     /// clock arm for the clock algorithm
     130              :     next: AtomicUsize,
     131              : }
     132              : 
     133              : struct Slot {
     134              :     inner: RwLock<SlotInner>,
     135              : 
     136              :     /// has this file been used since last clock sweep?
     137              :     recently_used: AtomicBool,
     138              : }
     139              : 
     140              : struct SlotInner {
     141              :     /// Counter that's incremented every time a different file is stored here.
     142              :     /// To avoid the ABA problem.
     143              :     tag: u64,
     144              : 
     145              :     /// the underlying file
     146              :     file: Option<OwnedFd>,
     147              : }
     148              : 
     149              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     150              : struct PageWriteGuardBuf {
     151              :     page: PageWriteGuard<'static>,
     152              : }
     153              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     154              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     155              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     156              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     157              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     158       621481 :     fn stable_ptr(&self) -> *const u8 {
     159       621481 :         self.page.as_ptr()
     160       621481 :     }
     161      1165278 :     fn bytes_init(&self) -> usize {
     162      1165278 :         self.page.len()
     163      1165278 :     }
     164       466113 :     fn bytes_total(&self) -> usize {
     165       466113 :         self.page.len()
     166       466113 :     }
     167              : }
     168              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     169              : // hence it's safe to hand out the `stable_mut_ptr()`.
     170              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     171       233055 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     172       233055 :         self.page.as_mut_ptr()
     173       233055 :     }
     174              : 
     175       155371 :     unsafe fn set_init(&mut self, pos: usize) {
     176       155371 :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     177       155371 :         assert!(pos <= self.page.len());
     178       155371 :     }
     179              : }
     180              : 
     181              : impl OpenFiles {
     182              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     183              :     ///
     184              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     185              :     /// recently_used has been set. It's all ready for reuse.
     186       568180 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     187       568180 :         //
     188       568180 :         // Run the clock algorithm to find a slot to replace.
     189       568180 :         //
     190       568180 :         let num_slots = self.slots.len();
     191       568180 :         let mut retries = 0;
     192              :         let mut slot;
     193              :         let mut slot_guard;
     194              :         let index;
     195      5941939 :         loop {
     196      5941939 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     197      5941939 :             slot = &self.slots[next];
     198      5941939 : 
     199      5941939 :             // If the recently_used flag on this slot is set, continue the clock
     200      5941939 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     201      5941939 :             // lock, also continue the clock sweep.
     202      5941939 :             //
     203      5941939 :             // We only continue in this manner for a while, though. If we loop
     204      5941939 :             // through the array twice without finding a victim, just pick the
     205      5941939 :             // next slot and wait until we can reuse it. This way, we avoid
     206      5941939 :             // spinning in the extreme case that all the slots are busy with an
     207      5941939 :             // I/O operation.
     208      5941939 :             if retries < num_slots * 2 {
     209      5735859 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     210      5121563 :                     if let Ok(guard) = slot.inner.try_write() {
     211       362100 :                         slot_guard = guard;
     212       362100 :                         index = next;
     213       362100 :                         break;
     214      4759463 :                     }
     215       614296 :                 }
     216      5373759 :                 retries += 1;
     217              :             } else {
     218       206080 :                 slot_guard = slot.inner.write().await;
     219       206080 :                 index = next;
     220       206080 :                 break;
     221              :             }
     222              :         }
     223              : 
     224              :         //
     225              :         // We now have the victim slot locked. If it was in use previously, close the
     226              :         // old file.
     227              :         //
     228       568180 :         if let Some(old_file) = slot_guard.file.take() {
     229       553972 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     230       553972 :             // distinguish the two.
     231       553972 :             STORAGE_IO_TIME_METRIC
     232       553972 :                 .get(StorageIoOperation::CloseByReplace)
     233       553972 :                 .observe_closure_duration(|| drop(old_file));
     234       553972 :         }
     235              : 
     236              :         // Prepare the slot for reuse and return it
     237       568180 :         slot_guard.tag += 1;
     238       568180 :         slot.recently_used.store(true, Ordering::Relaxed);
     239       568180 :         (
     240       568180 :             SlotHandle {
     241       568180 :                 index,
     242       568180 :                 tag: slot_guard.tag,
     243       568180 :             },
     244       568180 :             slot_guard,
     245       568180 :         )
     246       568180 :     }
     247              : }
     248              : 
     249              : /// Identify error types that should alwways terminate the process.  Other
     250              : /// error types may be elegible for retry.
     251            6 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     252            6 :     use nix::errno::Errno::*;
     253            6 :     match e.raw_os_error().map(nix::errno::from_i32) {
     254              :         Some(EIO) => {
     255              :             // Terminate on EIO because we no longer trust the device to store
     256              :             // data safely, or to uphold persistence guarantees on fsync.
     257            0 :             true
     258              :         }
     259              :         Some(EROFS) => {
     260              :             // Terminate on EROFS because a filesystem is usually remounted
     261              :             // readonly when it has experienced some critical issue, so the same
     262              :             // logic as EIO applies.
     263            0 :             true
     264              :         }
     265              :         Some(EACCES) => {
     266              :             // Terminate on EACCESS because we should always have permissions
     267              :             // for our own data dir: if we don't, then we can't do our job and
     268              :             // need administrative intervention to fix permissions.  Terminating
     269              :             // is the best way to make sure we stop cleanly rather than going
     270              :             // into infinite retry loops, and will make it clear to the outside
     271              :             // world that we need help.
     272            0 :             true
     273              :         }
     274              :         _ => {
     275              :             // Treat all other local file I/O errors are retryable.  This includes:
     276              :             // - ENOSPC: we stay up and wait for eviction to free some space
     277              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     278              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     279            6 :             false
     280              :         }
     281              :     }
     282            6 : }
     283              : 
     284              : /// Call this when the local filesystem gives us an error with an external
     285              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     286              : /// bad storage or bad configuration, and we can't fix that from inside
     287              : /// a running process.
     288            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     289            0 :     tracing::error!("Fatal I/O error: {e}: {context})");
     290            0 :     std::process::abort();
     291              : }
     292              : 
     293              : pub(crate) trait MaybeFatalIo<T> {
     294              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     295              :     fn fatal_err(self, context: &str) -> T;
     296              : }
     297              : 
     298              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     299              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     300              :     ///
     301              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     302              :     /// not on ENOSPC.
     303      3407308 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     304      3407308 :         if let Err(e) = &self {
     305            6 :             if is_fatal_io_error(e) {
     306            0 :                 on_fatal_io_error(e, context);
     307            6 :             }
     308      3407302 :         }
     309      3407308 :         self
     310      3407308 :     }
     311              : 
     312              :     /// Terminate the process on any I/O error.
     313              :     ///
     314              :     /// This is appropriate for reads on files that we know exist: they should always work.
     315         6114 :     fn fatal_err(self, context: &str) -> T {
     316         6114 :         match self {
     317         6114 :             Ok(v) => v,
     318            0 :             Err(e) => {
     319            0 :                 on_fatal_io_error(&e, context);
     320              :             }
     321              :         }
     322         6114 :     }
     323              : }
     324              : 
     325              : /// Observe duration for the given storage I/O operation
     326              : ///
     327              : /// Unlike `observe_closure_duration`, this supports async,
     328              : /// where "support" means that we measure wall clock time.
     329              : macro_rules! observe_duration {
     330              :     ($op:expr, $($body:tt)*) => {{
     331              :         let instant = Instant::now();
     332              :         let result = $($body)*;
     333              :         let elapsed = instant.elapsed().as_secs_f64();
     334              :         STORAGE_IO_TIME_METRIC
     335              :             .get($op)
     336              :             .observe(elapsed);
     337              :         result
     338              :     }}
     339              : }
     340              : 
     341              : macro_rules! with_file {
     342              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     343              :         let $ident = $this.lock_file().await?;
     344              :         observe_duration!($op, $($body)*)
     345              :     }};
     346              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     347              :         let mut $ident = $this.lock_file().await?;
     348              :         observe_duration!($op, $($body)*)
     349              :     }};
     350              : }
     351              : 
     352              : impl VirtualFile {
     353              :     /// Open a file in read-only mode. Like File::open.
     354         6576 :     pub async fn open<P: AsRef<Utf8Path>>(
     355         6576 :         path: P,
     356         6576 :         ctx: &RequestContext,
     357         6576 :     ) -> Result<VirtualFile, std::io::Error> {
     358         6576 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
     359         6576 :     }
     360              : 
     361              :     /// Create a new file for writing. If the file exists, it will be truncated.
     362              :     /// Like File::create.
     363         4377 :     pub async fn create<P: AsRef<Utf8Path>>(
     364         4377 :         path: P,
     365         4377 :         ctx: &RequestContext,
     366         4377 :     ) -> Result<VirtualFile, std::io::Error> {
     367         4377 :         Self::open_with_options(
     368         4377 :             path.as_ref(),
     369         4377 :             OpenOptions::new().write(true).create(true).truncate(true),
     370         4377 :             ctx,
     371         4377 :         )
     372         2265 :         .await
     373         4377 :     }
     374              : 
     375              :     /// Open a file with given options.
     376              :     ///
     377              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     378              :     /// they will be applied also when the file is subsequently re-opened, not only
     379              :     /// on the first time. Make sure that's sane!
     380        17475 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     381        17475 :         path: P,
     382        17475 :         open_options: &OpenOptions,
     383        17475 :         _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
     384        17475 :     ) -> Result<VirtualFile, std::io::Error> {
     385        17475 :         let path_ref = path.as_ref();
     386        17475 :         let path_str = path_ref.to_string();
     387        17475 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     388        17475 :         let (tenant_id, shard_id, timeline_id) =
     389        17475 :             if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     390        12993 :                 let tenant_shard_part = parts[parts.len() - 4];
     391        12993 :                 let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
     392        12993 :                     Ok(tenant_shard_id) => (
     393        12993 :                         tenant_shard_id.tenant_id.to_string(),
     394        12993 :                         format!("{}", tenant_shard_id.shard_slug()),
     395        12993 :                     ),
     396              :                     Err(_) => {
     397              :                         // Malformed path: this ID is just for observability, so tolerate it
     398              :                         // and pass through
     399            0 :                         (tenant_shard_part.to_string(), "*".to_string())
     400              :                     }
     401              :                 };
     402        12993 :                 (tenant_id, shard_id, parts[parts.len() - 2].to_string())
     403              :             } else {
     404         4482 :                 ("*".to_string(), "*".to_string(), "*".to_string())
     405              :             };
     406        17475 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     407              : 
     408              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     409              :         // where our caller doesn't get to use the returned VirtualFile before its
     410              :         // slot gets re-used by someone else.
     411        17475 :         let file = observe_duration!(StorageIoOperation::Open, {
     412        17475 :             open_options.open(path_ref.as_std_path()).await?
     413              :         });
     414              : 
     415              :         // Strip all options other than read and write.
     416              :         //
     417              :         // It would perhaps be nicer to check just for the read and write flags
     418              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     419              :         // only to set them.
     420        17475 :         let mut reopen_options = open_options.clone();
     421        17475 :         reopen_options.create(false);
     422        17475 :         reopen_options.create_new(false);
     423        17475 :         reopen_options.truncate(false);
     424        17475 : 
     425        17475 :         let vfile = VirtualFile {
     426        17475 :             handle: RwLock::new(handle),
     427        17475 :             pos: 0,
     428        17475 :             path: path_ref.to_path_buf(),
     429        17475 :             open_options: reopen_options,
     430        17475 :             tenant_id,
     431        17475 :             shard_id,
     432        17475 :             timeline_id,
     433        17475 :         };
     434        17475 : 
     435        17475 :         // TODO: Under pressure, it's likely the slot will get re-used and
     436        17475 :         // the underlying file closed before they get around to using it.
     437        17475 :         // => https://github.com/neondatabase/neon/issues/6065
     438        17475 :         slot_guard.file.replace(file);
     439        17475 : 
     440        17475 :         Ok(vfile)
     441        17475 :     }
     442              : 
     443              :     /// Async version of [`::utils::crashsafe::overwrite`].
     444              :     ///
     445              :     /// # NB:
     446              :     ///
     447              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     448              :     /// it did at an earlier time.
     449              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     450           84 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     451           84 :         final_path: Utf8PathBuf,
     452           84 :         tmp_path: Utf8PathBuf,
     453           84 :         content: B,
     454           84 :     ) -> std::io::Result<()> {
     455           84 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     456           84 :         // See https://github.com/neondatabase/neon/issues/6663
     457           84 : 
     458           84 :         tokio::task::spawn_blocking(move || {
     459           84 :             let slice_storage;
     460           84 :             let content_len = content.bytes_init();
     461           84 :             let content = if content.bytes_init() > 0 {
     462           84 :                 slice_storage = Some(content.slice(0..content_len));
     463           84 :                 slice_storage.as_deref().expect("just set it to Some()")
     464              :             } else {
     465            0 :                 &[]
     466              :             };
     467           84 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     468           84 :         })
     469           84 :         .await
     470           84 :         .expect("blocking task is never aborted")
     471           84 :     }
     472              : 
     473              :     /// Call File::sync_all() on the underlying File.
     474         8115 :     pub async fn sync_all(&self) -> Result<(), Error> {
     475         8115 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     476         8115 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     477         8115 :             res
     478              :         })
     479         8115 :     }
     480              : 
     481              :     /// Call File::sync_data() on the underlying File.
     482            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     483            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     484            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     485            0 :             res
     486              :         })
     487            0 :     }
     488              : 
     489         5106 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     490         5106 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     491         5106 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     492         5106 :             res
     493              :         })
     494         5106 :     }
     495              : 
     496              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     497              :     /// opens it and evicts some other File if necessary. The passed parameter is
     498              :     /// assumed to be a function available for the physical `File`.
     499              :     ///
     500              :     /// We are doing it via a macro as Rust doesn't support async closures that
     501              :     /// take on parameters with lifetimes.
     502      5773385 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     503      5773385 :         let open_files = get_open_files();
     504              : 
     505       550705 :         let mut handle_guard = {
     506              :             // Read the cached slot handle, and see if the slot that it points to still
     507              :             // contains our File.
     508              :             //
     509              :             // We only need to hold the handle lock while we read the current handle. If
     510              :             // another thread closes the file and recycles the slot for a different file,
     511              :             // we will notice that the handle we read is no longer valid and retry.
     512      5773385 :             let mut handle = *self.handle.read().await;
     513      6043670 :             loop {
     514      6043670 :                 // Check if the slot contains our File
     515      6043670 :                 {
     516      6043670 :                     let slot = &open_files.slots[handle.index];
     517      6043670 :                     let slot_guard = slot.inner.read().await;
     518      6043670 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     519              :                         // Found a cached file descriptor.
     520      5222680 :                         slot.recently_used.store(true, Ordering::Relaxed);
     521      5222680 :                         return Ok(FileGuard { slot_guard });
     522       820990 :                     }
     523              :                 }
     524              : 
     525              :                 // The slot didn't contain our File. We will have to open it ourselves,
     526              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     527              :                 // that no other thread will try to concurrently open the same file.
     528       820990 :                 let handle_guard = self.handle.write().await;
     529              : 
     530              :                 // If another thread changed the handle while we were not holding the lock,
     531              :                 // then the handle might now be valid again. Loop back to retry.
     532       820990 :                 if *handle_guard != handle {
     533       270285 :                     handle = *handle_guard;
     534       270285 :                     continue;
     535       550705 :                 }
     536       550705 :                 break handle_guard;
     537              :             }
     538              :         };
     539              : 
     540              :         // We need to open the file ourselves. The handle in the VirtualFile is
     541              :         // now locked in write-mode. Find a free slot to put it in.
     542       550705 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     543              : 
     544              :         // Re-open the physical file.
     545              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     546              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     547              :         // of the virtual file descriptor cache.
     548       550705 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     549       550705 :             self.open_options.open(self.path.as_std_path()).await?
     550              :         });
     551              : 
     552              :         // Store the File in the slot and update the handle in the VirtualFile
     553              :         // to point to it.
     554       550705 :         slot_guard.file.replace(file);
     555       550705 : 
     556       550705 :         *handle_guard = handle;
     557       550705 : 
     558       550705 :         return Ok(FileGuard {
     559       550705 :             slot_guard: slot_guard.downgrade(),
     560       550705 :         });
     561      5773385 :     }
     562              : 
     563          618 :     pub fn remove(self) {
     564          618 :         let path = self.path.clone();
     565          618 :         drop(self);
     566          618 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     567          618 :     }
     568              : 
     569        15972 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     570        15972 :         match pos {
     571        15942 :             SeekFrom::Start(offset) => {
     572        15942 :                 self.pos = offset;
     573        15942 :             }
     574           12 :             SeekFrom::End(offset) => {
     575           12 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     576           12 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     577              :             }
     578           18 :             SeekFrom::Current(offset) => {
     579           18 :                 let pos = self.pos as i128 + offset as i128;
     580           18 :                 if pos < 0 {
     581            6 :                     return Err(Error::new(
     582            6 :                         ErrorKind::InvalidInput,
     583            6 :                         "offset would be negative",
     584            6 :                     ));
     585           12 :                 }
     586           12 :                 if pos > u64::MAX as i128 {
     587            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     588           12 :                 }
     589           12 :                 self.pos = pos as u64;
     590              :             }
     591              :         }
     592        15960 :         Ok(self.pos)
     593        15972 :     }
     594              : 
     595              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     596              :     ///
     597              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     598      2351578 :     pub async fn read_exact_at<Buf>(
     599      2351578 :         &self,
     600      2351578 :         slice: Slice<Buf>,
     601      2351578 :         offset: u64,
     602      2351578 :         ctx: &RequestContext,
     603      2351578 :     ) -> Result<Slice<Buf>, Error>
     604      2351578 :     where
     605      2351578 :         Buf: IoBufMut + Send,
     606      2351578 :     {
     607      2351578 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     608      2351578 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     609              :         } else {
     610            0 :             None
     611              :         };
     612              : 
     613      2351578 :         let original_bounds = slice.bounds();
     614      2351578 :         let (buf, res) =
     615      2509375 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     616      2351578 :         let res = res.map(|_| buf.slice(original_bounds));
     617              : 
     618      2351578 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     619      2351578 :             if let Ok(slice) = &res {
     620      2351578 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     621      2351578 :                 assert_eq!(original_bounds, returned_bounds);
     622            0 :             }
     623            0 :         }
     624              : 
     625      2351578 :         res
     626      2351578 :     }
     627              : 
     628              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     629       155371 :     pub async fn read_exact_at_page(
     630       155371 :         &self,
     631       155371 :         page: PageWriteGuard<'static>,
     632       155371 :         offset: u64,
     633       155371 :         ctx: &RequestContext,
     634       155371 :     ) -> Result<PageWriteGuard<'static>, Error> {
     635       155371 :         let buf = PageWriteGuardBuf { page }.slice_full();
     636       155371 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     637       155371 :         self.read_exact_at(buf, offset, ctx)
     638        96199 :             .await
     639       155371 :             .map(|slice| slice.into_inner().page)
     640       155371 :     }
     641              : 
     642              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     643           12 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     644           12 :         &self,
     645           12 :         buf: FullSlice<Buf>,
     646           12 :         mut offset: u64,
     647           12 :         ctx: &RequestContext,
     648           12 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     649           12 :         let buf = buf.into_raw_slice();
     650           12 :         let bounds = buf.bounds();
     651           12 :         let restore =
     652           12 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     653           12 :         let mut buf = buf;
     654           24 :         while !buf.is_empty() {
     655           12 :             let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
     656           12 :             buf = tmp.into_raw_slice();
     657            0 :             match res {
     658              :                 Ok(0) => {
     659            0 :                     return (
     660            0 :                         restore(buf),
     661            0 :                         Err(Error::new(
     662            0 :                             std::io::ErrorKind::WriteZero,
     663            0 :                             "failed to write whole buffer",
     664            0 :                         )),
     665            0 :                     );
     666              :                 }
     667           12 :                 Ok(n) => {
     668           12 :                     buf = buf.slice(n..);
     669           12 :                     offset += n as u64;
     670           12 :                 }
     671            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     672            0 :                 Err(e) => return (restore(buf), Err(e)),
     673              :             }
     674              :         }
     675           12 :         (restore(buf), Ok(()))
     676           12 :     }
     677              : 
     678              :     /// Writes `buf` to the file at the current offset.
     679              :     ///
     680              :     /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
     681      3407344 :     pub async fn write_all<Buf: IoBuf + Send>(
     682      3407344 :         &mut self,
     683      3407344 :         buf: FullSlice<Buf>,
     684      3407344 :         ctx: &RequestContext,
     685      3407344 :     ) -> (FullSlice<Buf>, Result<usize, Error>) {
     686      3407344 :         let buf = buf.into_raw_slice();
     687      3407344 :         let bounds = buf.bounds();
     688      3407344 :         let restore =
     689      3407344 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     690      3407344 :         let nbytes = buf.len();
     691      3407344 :         let mut buf = buf;
     692      6814568 :         while !buf.is_empty() {
     693      3407230 :             let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
     694      3407230 :             buf = tmp.into_raw_slice();
     695            6 :             match res {
     696              :                 Ok(0) => {
     697            0 :                     return (
     698            0 :                         restore(buf),
     699            0 :                         Err(Error::new(
     700            0 :                             std::io::ErrorKind::WriteZero,
     701            0 :                             "failed to write whole buffer",
     702            0 :                         )),
     703            0 :                     );
     704              :                 }
     705      3407224 :                 Ok(n) => {
     706      3407224 :                     buf = buf.slice(n..);
     707      3407224 :                 }
     708            6 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     709            6 :                 Err(e) => return (restore(buf), Err(e)),
     710              :             }
     711              :         }
     712      3407338 :         (restore(buf), Ok(nbytes))
     713      3407344 :     }
     714              : 
     715      3407230 :     async fn write<B: IoBuf + Send>(
     716      3407230 :         &mut self,
     717      3407230 :         buf: FullSlice<B>,
     718      3407230 :         ctx: &RequestContext,
     719      3407230 :     ) -> (FullSlice<B>, Result<usize, std::io::Error>) {
     720      3407230 :         let pos = self.pos;
     721      3407230 :         let (buf, res) = self.write_at(buf, pos, ctx).await;
     722      3407230 :         let n = match res {
     723      3407224 :             Ok(n) => n,
     724            6 :             Err(e) => return (buf, Err(e)),
     725              :         };
     726      3407224 :         self.pos += n as u64;
     727      3407224 :         (buf, Ok(n))
     728      3407230 :     }
     729              : 
     730      2352910 :     pub(crate) async fn read_at<Buf>(
     731      2352910 :         &self,
     732      2352910 :         buf: tokio_epoll_uring::Slice<Buf>,
     733      2352910 :         offset: u64,
     734      2352910 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     735      2352910 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     736      2352910 :     where
     737      2352910 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     738      2352910 :     {
     739      2352910 :         let file_guard = match self.lock_file().await {
     740      2352910 :             Ok(file_guard) => file_guard,
     741            0 :             Err(e) => return (buf, Err(e)),
     742              :         };
     743              : 
     744      2352910 :         observe_duration!(StorageIoOperation::Read, {
     745      2352910 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     746      2352910 :             if let Ok(size) = res {
     747      2352904 :                 STORAGE_IO_SIZE
     748      2352904 :                     .with_label_values(&[
     749      2352904 :                         "read",
     750      2352904 :                         &self.tenant_id,
     751      2352904 :                         &self.shard_id,
     752      2352904 :                         &self.timeline_id,
     753      2352904 :                     ])
     754      2352904 :                     .add(size as i64);
     755      2352904 :             }
     756      2352910 :             (buf, res)
     757              :         })
     758      2352910 :     }
     759              : 
     760              :     /// The function aborts the process if the error is fatal.
     761      3407242 :     async fn write_at<B: IoBuf + Send>(
     762      3407242 :         &self,
     763      3407242 :         buf: FullSlice<B>,
     764      3407242 :         offset: u64,
     765      3407242 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     766      3407242 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     767      3407242 :         let (slice, result) = self.write_at_inner(buf, offset, _ctx).await;
     768      3407242 :         let result = result.maybe_fatal_err("write_at");
     769      3407242 :         (slice, result)
     770      3407242 :     }
     771              : 
     772      3407242 :     async fn write_at_inner<B: IoBuf + Send>(
     773      3407242 :         &self,
     774      3407242 :         buf: FullSlice<B>,
     775      3407242 :         offset: u64,
     776      3407242 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     777      3407242 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     778      3407242 :         let file_guard = match self.lock_file().await {
     779      3407242 :             Ok(file_guard) => file_guard,
     780            0 :             Err(e) => return (buf, Err(e)),
     781              :         };
     782      3407242 :         observe_duration!(StorageIoOperation::Write, {
     783      3407242 :             let ((_file_guard, buf), result) =
     784      3407242 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     785      3407242 :             if let Ok(size) = result {
     786      3407236 :                 STORAGE_IO_SIZE
     787      3407236 :                     .with_label_values(&[
     788      3407236 :                         "write",
     789      3407236 :                         &self.tenant_id,
     790      3407236 :                         &self.shard_id,
     791      3407236 :                         &self.timeline_id,
     792      3407236 :                     ])
     793      3407236 :                     .add(size as i64);
     794      3407236 :             }
     795      3407242 :             (buf, result)
     796              :         })
     797      3407242 :     }
     798              : }
     799              : 
     800              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     801      2351602 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     802      2351602 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     803      2351602 :     mut offset: u64,
     804      2351602 :     mut read_at: F,
     805      2351602 : ) -> (Buf, std::io::Result<()>)
     806      2351602 : where
     807      2351602 :     Buf: IoBufMut + Send,
     808      2351602 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
     809      2351602 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
     810      2351602 : {
     811      4703210 :     while buf.bytes_total() != 0 {
     812              :         let res;
     813      2509375 :         (buf, res) = read_at(buf, offset).await;
     814            0 :         match res {
     815            6 :             Ok(0) => break,
     816      2351608 :             Ok(n) => {
     817      2351608 :                 buf = buf.slice(n..);
     818      2351608 :                 offset += n as u64;
     819      2351608 :             }
     820            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     821            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     822              :         }
     823              :     }
     824              :     // NB: don't use `buf.is_empty()` here; it is from the
     825              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     826              :     // returned by it only covers the initialized portion of `buf`.
     827              :     // Whereas we're interested in ensuring that we filled the entire
     828              :     // buffer that the user passed in.
     829      2351602 :     if buf.bytes_total() != 0 {
     830            6 :         (
     831            6 :             buf.into_inner(),
     832            6 :             Err(std::io::Error::new(
     833            6 :                 std::io::ErrorKind::UnexpectedEof,
     834            6 :                 "failed to fill whole buffer",
     835            6 :             )),
     836            6 :         )
     837              :     } else {
     838      2351596 :         assert_eq!(buf.len(), buf.bytes_total());
     839      2351596 :         (buf.into_inner(), Ok(()))
     840              :     }
     841      2351602 : }
     842              : 
     843              : #[cfg(test)]
     844              : mod test_read_exact_at_impl {
     845              : 
     846              :     use std::{collections::VecDeque, sync::Arc};
     847              : 
     848              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     849              : 
     850              :     use super::read_exact_at_impl;
     851              : 
     852              :     struct Expectation {
     853              :         offset: u64,
     854              :         bytes_total: usize,
     855              :         result: std::io::Result<Vec<u8>>,
     856              :     }
     857              :     struct MockReadAt {
     858              :         expectations: VecDeque<Expectation>,
     859              :     }
     860              : 
     861              :     impl MockReadAt {
     862           36 :         async fn read_at(
     863           36 :             &mut self,
     864           36 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     865           36 :             offset: u64,
     866           36 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     867           36 :             let exp = self
     868           36 :                 .expectations
     869           36 :                 .pop_front()
     870           36 :                 .expect("read_at called but we have no expectations left");
     871           36 :             assert_eq!(exp.offset, offset);
     872           36 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     873           36 :             match exp.result {
     874           36 :                 Ok(bytes) => {
     875           36 :                     assert!(bytes.len() <= buf.bytes_total());
     876           36 :                     buf.put_slice(&bytes);
     877           36 :                     (buf, Ok(bytes.len()))
     878              :                 }
     879            0 :                 Err(e) => (buf, Err(e)),
     880              :             }
     881           36 :         }
     882              :     }
     883              : 
     884              :     impl Drop for MockReadAt {
     885           24 :         fn drop(&mut self) {
     886           24 :             assert_eq!(self.expectations.len(), 0);
     887           24 :         }
     888              :     }
     889              : 
     890              :     #[tokio::test]
     891            6 :     async fn test_basic() {
     892            6 :         let buf = Vec::with_capacity(5).slice_full();
     893            6 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     894            6 :             expectations: VecDeque::from(vec![Expectation {
     895            6 :                 offset: 0,
     896            6 :                 bytes_total: 5,
     897            6 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     898            6 :             }]),
     899            6 :         }));
     900            6 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     901            6 :             let mock_read_at = Arc::clone(&mock_read_at);
     902            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     903            6 :         })
     904            6 :         .await;
     905            6 :         assert!(res.is_ok());
     906            6 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
     907            6 :     }
     908              : 
     909              :     #[tokio::test]
     910            6 :     async fn test_empty_buf_issues_no_syscall() {
     911            6 :         let buf = Vec::new().slice_full();
     912            6 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     913            6 :             expectations: VecDeque::new(),
     914            6 :         }));
     915            6 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     916            0 :             let mock_read_at = Arc::clone(&mock_read_at);
     917            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     918            6 :         })
     919            6 :         .await;
     920            6 :         assert!(res.is_ok());
     921            6 :     }
     922              : 
     923              :     #[tokio::test]
     924            6 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
     925            6 :         let buf = Vec::with_capacity(4).slice_full();
     926            6 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     927            6 :             expectations: VecDeque::from(vec![
     928            6 :                 Expectation {
     929            6 :                     offset: 0,
     930            6 :                     bytes_total: 4,
     931            6 :                     result: Ok(vec![b'a', b'b']),
     932            6 :                 },
     933            6 :                 Expectation {
     934            6 :                     offset: 2,
     935            6 :                     bytes_total: 2,
     936            6 :                     result: Ok(vec![b'c', b'd']),
     937            6 :                 },
     938            6 :             ]),
     939            6 :         }));
     940           12 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     941           12 :             let mock_read_at = Arc::clone(&mock_read_at);
     942           12 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     943           12 :         })
     944            6 :         .await;
     945            6 :         assert!(res.is_ok());
     946            6 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
     947            6 :     }
     948              : 
     949              :     #[tokio::test]
     950            6 :     async fn test_eof_before_buffer_full() {
     951            6 :         let buf = Vec::with_capacity(3).slice_full();
     952            6 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     953            6 :             expectations: VecDeque::from(vec![
     954            6 :                 Expectation {
     955            6 :                     offset: 0,
     956            6 :                     bytes_total: 3,
     957            6 :                     result: Ok(vec![b'a']),
     958            6 :                 },
     959            6 :                 Expectation {
     960            6 :                     offset: 1,
     961            6 :                     bytes_total: 2,
     962            6 :                     result: Ok(vec![b'b']),
     963            6 :                 },
     964            6 :                 Expectation {
     965            6 :                     offset: 2,
     966            6 :                     bytes_total: 1,
     967            6 :                     result: Ok(vec![]),
     968            6 :                 },
     969            6 :             ]),
     970            6 :         }));
     971           18 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     972           18 :             let mock_read_at = Arc::clone(&mock_read_at);
     973           18 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     974           18 :         })
     975            6 :         .await;
     976            6 :         let Err(err) = res else {
     977            6 :             panic!("should return an error");
     978            6 :         };
     979            6 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
     980            6 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
     981            6 :         // buffer contents on error are unspecified
     982            6 :     }
     983              : }
     984              : 
     985              : struct FileGuard {
     986              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
     987              : }
     988              : 
     989              : impl AsRef<OwnedFd> for FileGuard {
     990      5773385 :     fn as_ref(&self) -> &OwnedFd {
     991      5773385 :         // This unwrap is safe because we only create `FileGuard`s
     992      5773385 :         // if we know that the file is Some.
     993      5773385 :         self.slot_guard.file.as_ref().unwrap()
     994      5773385 :     }
     995              : }
     996              : 
     997              : impl FileGuard {
     998              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     999      2887914 :     fn with_std_file<F, R>(&self, with: F) -> R
    1000      2887914 :     where
    1001      2887914 :         F: FnOnce(&File) -> R,
    1002      2887914 :     {
    1003      2887914 :         // SAFETY:
    1004      2887914 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1005      2887914 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
    1006      2887914 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1007      2887914 :         let res = with(&file);
    1008      2887914 :         let _ = file.into_raw_fd();
    1009      2887914 :         res
    1010      2887914 :     }
    1011              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1012           12 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
    1013           12 :     where
    1014           12 :         F: FnOnce(&mut File) -> R,
    1015           12 :     {
    1016           12 :         // SAFETY:
    1017           12 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1018           12 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
    1019           12 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1020           12 :         let res = with(&mut file);
    1021           12 :         let _ = file.into_raw_fd();
    1022           12 :         res
    1023           12 :     }
    1024              : }
    1025              : 
    1026              : impl tokio_epoll_uring::IoFd for FileGuard {
    1027      2885459 :     unsafe fn as_fd(&self) -> RawFd {
    1028      2885459 :         let owned_fd: &OwnedFd = self.as_ref();
    1029      2885459 :         owned_fd.as_raw_fd()
    1030      2885459 :     }
    1031              : }
    1032              : 
    1033              : #[cfg(test)]
    1034              : impl VirtualFile {
    1035        62748 :     pub(crate) async fn read_blk(
    1036        62748 :         &self,
    1037        62748 :         blknum: u32,
    1038        62748 :         ctx: &RequestContext,
    1039        62748 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1040        62748 :         use crate::page_cache::PAGE_SZ;
    1041        62748 :         let slice = Vec::with_capacity(PAGE_SZ).slice_full();
    1042        62748 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1043        62748 :         let slice = self
    1044        62748 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1045        31857 :             .await?;
    1046        62748 :         Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
    1047        62748 :     }
    1048              : 
    1049          672 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
    1050          672 :         let mut tmp = vec![0; 128];
    1051              :         loop {
    1052         1332 :             let slice = tmp.slice(..128);
    1053         1332 :             let (slice, res) = self.read_at(slice, self.pos, ctx).await;
    1054            6 :             match res {
    1055          666 :                 Ok(0) => return Ok(()),
    1056          660 :                 Ok(n) => {
    1057          660 :                     self.pos += n as u64;
    1058          660 :                     buf.extend_from_slice(&slice[..n]);
    1059          660 :                 }
    1060            6 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1061            6 :                 Err(e) => return Err(e),
    1062              :             }
    1063          660 :             tmp = slice.into_inner();
    1064              :         }
    1065          672 :     }
    1066              : }
    1067              : 
    1068              : impl Drop for VirtualFile {
    1069              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1070        15137 :     fn drop(&mut self) {
    1071        15137 :         let handle = self.handle.get_mut();
    1072        15137 : 
    1073        15137 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1074        15137 :             if slot_guard.tag == tag {
    1075        15137 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1076        15137 :                 // there is also operation "close-by-replace" for closes done on eviction for
    1077        15137 :                 // comparison.
    1078        15137 :                 if let Some(fd) = slot_guard.file.take() {
    1079        13485 :                     STORAGE_IO_TIME_METRIC
    1080        13485 :                         .get(StorageIoOperation::Close)
    1081        13485 :                         .observe_closure_duration(|| drop(fd));
    1082        13485 :                 }
    1083        15137 :             }
    1084        15137 :         }
    1085        15137 : 
    1086        15137 :         // We don't have async drop so we cannot directly await the lock here.
    1087        15137 :         // Instead, first do a best-effort attempt at closing the underlying
    1088        15137 :         // file descriptor by using `try_write`, and if that fails, spawn
    1089        15137 :         // a tokio task to do it asynchronously: we just want it to be
    1090        15137 :         // cleaned up eventually.
    1091        15137 :         // Most of the time, the `try_lock` should succeed though,
    1092        15137 :         // as we have `&mut self` access. In other words, if the slot
    1093        15137 :         // is still occupied by our file, there should be no access from
    1094        15137 :         // other I/O operations; the only other possible place to lock
    1095        15137 :         // the slot is the lock algorithm looking for free slots.
    1096        15137 :         let slot = &get_open_files().slots[handle.index];
    1097        15137 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1098        15137 :             clean_slot(slot, slot_guard, handle.tag);
    1099        15137 :         } else {
    1100            0 :             let tag = handle.tag;
    1101            0 :             tokio::spawn(async move {
    1102            0 :                 let slot_guard = slot.inner.write().await;
    1103            0 :                 clean_slot(slot, slot_guard, tag);
    1104            0 :             });
    1105            0 :         };
    1106        15137 :     }
    1107              : }
    1108              : 
    1109              : impl OwnedAsyncWriter for VirtualFile {
    1110              :     #[inline(always)]
    1111        19791 :     async fn write_all<Buf: IoBuf + Send>(
    1112        19791 :         &mut self,
    1113        19791 :         buf: FullSlice<Buf>,
    1114        19791 :         ctx: &RequestContext,
    1115        19791 :     ) -> std::io::Result<(usize, FullSlice<Buf>)> {
    1116        19791 :         let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
    1117        19791 :         res.map(move |v| (v, buf))
    1118        19791 :     }
    1119              : }
    1120              : 
    1121              : impl OpenFiles {
    1122          594 :     fn new(num_slots: usize) -> OpenFiles {
    1123          594 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1124         5940 :         for _ in 0..num_slots {
    1125         5940 :             let slot = Slot {
    1126         5940 :                 recently_used: AtomicBool::new(false),
    1127         5940 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1128         5940 :             };
    1129         5940 :             slots.push(slot);
    1130         5940 :         }
    1131              : 
    1132          594 :         OpenFiles {
    1133          594 :             next: AtomicUsize::new(0),
    1134          594 :             slots: Box::leak(slots),
    1135          594 :         }
    1136          594 :     }
    1137              : }
    1138              : 
    1139              : ///
    1140              : /// Initialize the virtual file module. This must be called once at page
    1141              : /// server startup.
    1142              : ///
    1143              : #[cfg(not(test))]
    1144            0 : pub fn init(num_slots: usize, engine: IoEngineKind, io_buffer_alignment: usize) {
    1145            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1146            0 :         panic!("virtual_file::init called twice");
    1147            0 :     }
    1148            0 :     if set_io_buffer_alignment(io_buffer_alignment).is_err() {
    1149            0 :         panic!("IO buffer alignment ({io_buffer_alignment}) is not a power of two");
    1150            0 :     }
    1151            0 :     io_engine::init(engine);
    1152            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1153            0 : }
    1154              : 
    1155              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1156              : 
    1157              : // Get a handle to the global slots array.
    1158      5805997 : fn get_open_files() -> &'static OpenFiles {
    1159      5805997 :     //
    1160      5805997 :     // In unit tests, page server startup doesn't happen and no one calls
    1161      5805997 :     // virtual_file::init(). Initialize it here, with a small array.
    1162      5805997 :     //
    1163      5805997 :     // This applies to the virtual file tests below, but all other unit
    1164      5805997 :     // tests too, so the virtual file facility is always usable in
    1165      5805997 :     // unit tests.
    1166      5805997 :     //
    1167      5805997 :     if cfg!(test) {
    1168      5805997 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1169              :     } else {
    1170            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1171              :     }
    1172      5805997 : }
    1173              : 
    1174              : static IO_BUFFER_ALIGNMENT: AtomicUsize = AtomicUsize::new(DEFAULT_IO_BUFFER_ALIGNMENT);
    1175              : 
    1176              : /// Returns true if `x` is zero or a power of two.
    1177      1264815 : fn is_zero_or_power_of_two(x: usize) -> bool {
    1178      1264815 :     (x == 0) || ((x & (x - 1)) == 0)
    1179      1264815 : }
    1180              : 
    1181              : #[allow(unused)]
    1182            6 : pub(crate) fn set_io_buffer_alignment(align: usize) -> Result<(), usize> {
    1183            6 :     if is_zero_or_power_of_two(align) {
    1184            6 :         IO_BUFFER_ALIGNMENT.store(align, std::sync::atomic::Ordering::Relaxed);
    1185            6 :         Ok(())
    1186              :     } else {
    1187            0 :         Err(align)
    1188              :     }
    1189            6 : }
    1190              : 
    1191              : /// Gets the io buffer alignment requirement. Returns 0 if there is no requirement specified.
    1192              : ///
    1193              : /// This function should be used to check the raw config value.
    1194      1264833 : pub(crate) fn get_io_buffer_alignment_raw() -> usize {
    1195      1264833 :     let align = IO_BUFFER_ALIGNMENT.load(std::sync::atomic::Ordering::Relaxed);
    1196      1264833 : 
    1197      1264833 :     if cfg!(test) {
    1198      1264833 :         let env_var_name = "NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT";
    1199      1264833 :         if align == DEFAULT_IO_BUFFER_ALIGNMENT {
    1200      1264809 :             if let Some(test_align) = utils::env::var(env_var_name) {
    1201      1264809 :                 if is_zero_or_power_of_two(test_align) {
    1202      1264809 :                     test_align
    1203              :                 } else {
    1204            0 :                     panic!("IO buffer alignment ({test_align}) is not a power of two");
    1205              :                 }
    1206              :             } else {
    1207            0 :                 crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT
    1208              :             }
    1209              :         } else {
    1210           24 :             align
    1211              :         }
    1212              :     } else {
    1213            0 :         align
    1214              :     }
    1215      1264833 : }
    1216              : 
    1217              : /// Gets the io buffer alignment requirement. Returns 1 if the alignment config is set to zero.
    1218              : ///
    1219              : /// This function should be used for getting the actual alignment value to use.
    1220       623634 : pub(crate) fn get_io_buffer_alignment() -> usize {
    1221       623634 :     let align = get_io_buffer_alignment_raw();
    1222       623634 :     if align == DEFAULT_IO_BUFFER_ALIGNMENT {
    1223       212640 :         1
    1224              :     } else {
    1225       410994 :         align
    1226              :     }
    1227       623634 : }
    1228              : 
    1229              : #[cfg(test)]
    1230              : mod tests {
    1231              :     use crate::context::DownloadBehavior;
    1232              :     use crate::task_mgr::TaskKind;
    1233              : 
    1234              :     use super::*;
    1235              :     use owned_buffers_io::io_buf_ext::IoBufExt;
    1236              :     use owned_buffers_io::slice::SliceMutExt;
    1237              :     use rand::seq::SliceRandom;
    1238              :     use rand::thread_rng;
    1239              :     use rand::Rng;
    1240              :     use std::io::Write;
    1241              :     use std::os::unix::fs::FileExt;
    1242              :     use std::sync::Arc;
    1243              : 
    1244              :     enum MaybeVirtualFile {
    1245              :         VirtualFile(VirtualFile),
    1246              :         File(File),
    1247              :     }
    1248              : 
    1249              :     impl From<VirtualFile> for MaybeVirtualFile {
    1250           18 :         fn from(vf: VirtualFile) -> Self {
    1251           18 :             MaybeVirtualFile::VirtualFile(vf)
    1252           18 :         }
    1253              :     }
    1254              : 
    1255              :     impl MaybeVirtualFile {
    1256         1212 :         async fn read_exact_at(
    1257         1212 :             &self,
    1258         1212 :             mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
    1259         1212 :             offset: u64,
    1260         1212 :             ctx: &RequestContext,
    1261         1212 :         ) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
    1262         1212 :             match self {
    1263          609 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
    1264          606 :                 MaybeVirtualFile::File(file) => {
    1265          606 :                     let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
    1266          606 :                     file.read_exact_at(rust_slice, offset).map(|()| slice)
    1267              :                 }
    1268              :             }
    1269         1212 :         }
    1270           24 :         async fn write_all_at<Buf: IoBuf + Send>(
    1271           24 :             &self,
    1272           24 :             buf: FullSlice<Buf>,
    1273           24 :             offset: u64,
    1274           24 :             ctx: &RequestContext,
    1275           24 :         ) -> Result<(), Error> {
    1276           24 :             match self {
    1277           12 :                 MaybeVirtualFile::VirtualFile(file) => {
    1278           12 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1279           12 :                     res
    1280              :                 }
    1281           12 :                 MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
    1282              :             }
    1283           24 :         }
    1284          108 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1285          108 :             match self {
    1286           54 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1287           54 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1288              :             }
    1289          108 :         }
    1290           24 :         async fn write_all<Buf: IoBuf + Send>(
    1291           24 :             &mut self,
    1292           24 :             buf: FullSlice<Buf>,
    1293           24 :             ctx: &RequestContext,
    1294           24 :         ) -> Result<(), Error> {
    1295           24 :             match self {
    1296           12 :                 MaybeVirtualFile::VirtualFile(file) => {
    1297           12 :                     let (_buf, res) = file.write_all(buf, ctx).await;
    1298           12 :                     res.map(|_| ())
    1299              :                 }
    1300           12 :                 MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
    1301              :             }
    1302           24 :         }
    1303              : 
    1304              :         // Helper function to slurp contents of a file, starting at the current position,
    1305              :         // into a string
    1306         1326 :         async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
    1307         1326 :             use std::io::Read;
    1308         1326 :             let mut buf = String::new();
    1309         1326 :             match self {
    1310          672 :                 MaybeVirtualFile::VirtualFile(file) => {
    1311          672 :                     let mut buf = Vec::new();
    1312          678 :                     file.read_to_end(&mut buf, ctx).await?;
    1313          666 :                     return Ok(String::from_utf8(buf).unwrap());
    1314              :                 }
    1315          654 :                 MaybeVirtualFile::File(file) => {
    1316          654 :                     file.read_to_string(&mut buf)?;
    1317              :                 }
    1318              :             }
    1319          648 :             Ok(buf)
    1320         1326 :         }
    1321              : 
    1322              :         // Helper function to slurp a portion of a file into a string
    1323         1212 :         async fn read_string_at(
    1324         1212 :             &mut self,
    1325         1212 :             pos: u64,
    1326         1212 :             len: usize,
    1327         1212 :             ctx: &RequestContext,
    1328         1212 :         ) -> Result<String, Error> {
    1329         1212 :             let slice = Vec::with_capacity(len).slice_full();
    1330         1212 :             assert_eq!(slice.bytes_total(), len);
    1331         1212 :             let slice = self.read_exact_at(slice, pos, ctx).await?;
    1332         1212 :             let vec = slice.into_inner();
    1333         1212 :             assert_eq!(vec.len(), len);
    1334         1212 :             Ok(String::from_utf8(vec).unwrap())
    1335         1212 :         }
    1336              :     }
    1337              : 
    1338              :     #[tokio::test]
    1339            6 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1340            6 :         // The real work is done in the test_files() helper function. This
    1341            6 :         // allows us to run the same set of tests against a native File, and
    1342            6 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1343            6 :         // but this allows us to verify that the operations return the same
    1344            6 :         // results with VirtualFiles as with native Files. (Except that with
    1345            6 :         // native files, you will run out of file descriptors if the ulimit
    1346            6 :         // is low enough.)
    1347            6 :         struct A;
    1348            6 : 
    1349            6 :         impl Adapter for A {
    1350          618 :             async fn open(
    1351          618 :                 path: Utf8PathBuf,
    1352          618 :                 opts: OpenOptions,
    1353          618 :                 ctx: &RequestContext,
    1354          618 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1355          618 :                 let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
    1356          618 :                 Ok(MaybeVirtualFile::VirtualFile(vf))
    1357          618 :             }
    1358            6 :         }
    1359         1593 :         test_files::<A>("virtual_files").await
    1360            6 :     }
    1361              : 
    1362              :     #[tokio::test]
    1363            6 :     async fn test_physical_files() -> anyhow::Result<()> {
    1364            6 :         struct B;
    1365            6 : 
    1366            6 :         impl Adapter for B {
    1367          618 :             async fn open(
    1368          618 :                 path: Utf8PathBuf,
    1369          618 :                 opts: OpenOptions,
    1370          618 :                 _ctx: &RequestContext,
    1371          618 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1372          618 :                 Ok(MaybeVirtualFile::File({
    1373          618 :                     let owned_fd = opts.open(path.as_std_path()).await?;
    1374          618 :                     File::from(owned_fd)
    1375            6 :                 }))
    1376          618 :             }
    1377            6 :         }
    1378            6 : 
    1379          312 :         test_files::<B>("physical_files").await
    1380            6 :     }
    1381              : 
    1382              :     /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
    1383              :     /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
    1384              :     /// in trait which benefits from the new lifetime capture rules already.
    1385              :     trait Adapter {
    1386              :         async fn open(
    1387              :             path: Utf8PathBuf,
    1388              :             opts: OpenOptions,
    1389              :             ctx: &RequestContext,
    1390              :         ) -> Result<MaybeVirtualFile, anyhow::Error>;
    1391              :     }
    1392              : 
    1393           12 :     async fn test_files<A>(testname: &str) -> anyhow::Result<()>
    1394           12 :     where
    1395           12 :         A: Adapter,
    1396           12 :     {
    1397           12 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1398           12 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1399           12 :         std::fs::create_dir_all(&testdir)?;
    1400              : 
    1401           12 :         let path_a = testdir.join("file_a");
    1402           12 :         let mut file_a = A::open(
    1403           12 :             path_a.clone(),
    1404           12 :             OpenOptions::new()
    1405           12 :                 .write(true)
    1406           12 :                 .create(true)
    1407           12 :                 .truncate(true)
    1408           12 :                 .to_owned(),
    1409           12 :             &ctx,
    1410           12 :         )
    1411           12 :         .await?;
    1412           12 :         file_a
    1413           12 :             .write_all(b"foobar".to_vec().slice_len(), &ctx)
    1414            3 :             .await?;
    1415              : 
    1416              :         // cannot read from a file opened in write-only mode
    1417           12 :         let _ = file_a.read_string(&ctx).await.unwrap_err();
    1418              : 
    1419              :         // Close the file and re-open for reading
    1420           12 :         let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
    1421              : 
    1422              :         // cannot write to a file opened in read-only mode
    1423           12 :         let _ = file_a
    1424           12 :             .write_all(b"bar".to_vec().slice_len(), &ctx)
    1425            3 :             .await
    1426           12 :             .unwrap_err();
    1427           12 : 
    1428           12 :         // Try simple read
    1429           12 :         assert_eq!("foobar", file_a.read_string(&ctx).await?);
    1430              : 
    1431              :         // It's positioned at the EOF now.
    1432           12 :         assert_eq!("", file_a.read_string(&ctx).await?);
    1433              : 
    1434              :         // Test seeks.
    1435           12 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1436           12 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1437              : 
    1438           12 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1439           12 :         assert_eq!("ar", file_a.read_string(&ctx).await?);
    1440              : 
    1441           12 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1442           12 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1443           12 :         assert_eq!("bar", file_a.read_string(&ctx).await?);
    1444              : 
    1445           12 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1446           12 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1447              : 
    1448              :         // Test erroneous seeks to before byte 0
    1449           12 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1450           12 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1451           12 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1452           12 : 
    1453           12 :         // the erroneous seek should have left the position unchanged
    1454           12 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1455              : 
    1456              :         // Create another test file, and try FileExt functions on it.
    1457           12 :         let path_b = testdir.join("file_b");
    1458           12 :         let mut file_b = A::open(
    1459           12 :             path_b.clone(),
    1460           12 :             OpenOptions::new()
    1461           12 :                 .read(true)
    1462           12 :                 .write(true)
    1463           12 :                 .create(true)
    1464           12 :                 .truncate(true)
    1465           12 :                 .to_owned(),
    1466           12 :             &ctx,
    1467           12 :         )
    1468            6 :         .await?;
    1469           12 :         file_b
    1470           12 :             .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
    1471            3 :             .await?;
    1472           12 :         file_b
    1473           12 :             .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
    1474            3 :             .await?;
    1475              : 
    1476           12 :         assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
    1477              : 
    1478              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1479              :         // open the same file many times. The effect is the same.)
    1480              :         //
    1481              :         // leave file_a positioned at offset 1 before we start
    1482           12 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1483              : 
    1484           12 :         let mut vfiles = Vec::new();
    1485         1212 :         for _ in 0..100 {
    1486         1200 :             let mut vfile = A::open(
    1487         1200 :                 path_b.clone(),
    1488         1200 :                 OpenOptions::new().read(true).to_owned(),
    1489         1200 :                 &ctx,
    1490         1200 :             )
    1491          600 :             .await?;
    1492         1200 :             assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
    1493         1200 :             vfiles.push(vfile);
    1494              :         }
    1495              : 
    1496              :         // make sure we opened enough files to definitely cause evictions.
    1497           12 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1498              : 
    1499              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1500              :         // from it again. We left the file positioned at offset 1 above.
    1501           12 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1502              : 
    1503              :         // Check that all the other FDs still work too. Use them in random order for
    1504              :         // good measure.
    1505           12 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1506         1200 :         for vfile in vfiles.iter_mut() {
    1507         1200 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
    1508              :         }
    1509              : 
    1510           12 :         Ok(())
    1511           12 :     }
    1512              : 
    1513              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1514              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1515              :     /// VirtualFile from multiple threads concurrently.
    1516              :     #[tokio::test]
    1517            6 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1518            6 :         const SIZE: usize = 8 * 1024;
    1519            6 :         const VIRTUAL_FILES: usize = 100;
    1520            6 :         const THREADS: usize = 100;
    1521            6 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1522            6 : 
    1523            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1524            6 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1525            6 :         std::fs::create_dir_all(&testdir)?;
    1526            6 : 
    1527            6 :         // Create a test file.
    1528            6 :         let test_file_path = testdir.join("concurrency_test_file");
    1529            6 :         {
    1530            6 :             let file = File::create(&test_file_path)?;
    1531            6 :             file.write_all_at(&SAMPLE, 0)?;
    1532            6 :         }
    1533            6 : 
    1534            6 :         // Open the file many times.
    1535            6 :         let mut files = Vec::new();
    1536          606 :         for _ in 0..VIRTUAL_FILES {
    1537          600 :             let f = VirtualFile::open_with_options(
    1538          600 :                 &test_file_path,
    1539          600 :                 OpenOptions::new().read(true),
    1540          600 :                 &ctx,
    1541          600 :             )
    1542          303 :             .await?;
    1543          600 :             files.push(f);
    1544            6 :         }
    1545            6 :         let files = Arc::new(files);
    1546            6 : 
    1547            6 :         // Launch many threads, and use the virtual files concurrently in random order.
    1548            6 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1549            6 :             .worker_threads(THREADS)
    1550            6 :             .thread_name("test_vfile_concurrency thread")
    1551            6 :             .build()
    1552            6 :             .unwrap();
    1553            6 :         let mut hdls = Vec::new();
    1554          606 :         for _threadno in 0..THREADS {
    1555          600 :             let files = files.clone();
    1556          600 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1557          600 :             let hdl = rt.spawn(async move {
    1558          600 :                 let mut buf = vec![0u8; SIZE];
    1559          600 :                 let mut rng = rand::rngs::OsRng;
    1560       600000 :                 for _ in 1..1000 {
    1561       599400 :                     let f = &files[rng.gen_range(0..files.len())];
    1562       599400 :                     buf = f
    1563       599400 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1564      1602787 :                         .await
    1565       599400 :                         .unwrap()
    1566       599400 :                         .into_inner();
    1567       599400 :                     assert!(buf == SAMPLE);
    1568            6 :                 }
    1569          600 :             });
    1570          600 :             hdls.push(hdl);
    1571          600 :         }
    1572          606 :         for hdl in hdls {
    1573          600 :             hdl.await?;
    1574            6 :         }
    1575            6 :         std::mem::forget(rt);
    1576            6 : 
    1577            6 :         Ok(())
    1578            6 :     }
    1579              : 
    1580              :     #[tokio::test]
    1581            6 :     async fn test_atomic_overwrite_basic() {
    1582            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1583            6 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1584            6 :         std::fs::create_dir_all(&testdir).unwrap();
    1585            6 : 
    1586            6 :         let path = testdir.join("myfile");
    1587            6 :         let tmp_path = testdir.join("myfile.tmp");
    1588            6 : 
    1589            6 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1590            6 :             .await
    1591            6 :             .unwrap();
    1592            6 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1593            6 :         let post = file.read_string(&ctx).await.unwrap();
    1594            6 :         assert_eq!(post, "foo");
    1595            6 :         assert!(!tmp_path.exists());
    1596            6 :         drop(file);
    1597            6 : 
    1598            6 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1599            6 :             .await
    1600            6 :             .unwrap();
    1601            6 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1602            6 :         let post = file.read_string(&ctx).await.unwrap();
    1603            6 :         assert_eq!(post, "bar");
    1604            6 :         assert!(!tmp_path.exists());
    1605            6 :         drop(file);
    1606            6 :     }
    1607              : 
    1608              :     #[tokio::test]
    1609            6 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1610            6 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1611            6 :         let testdir =
    1612            6 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1613            6 :         std::fs::create_dir_all(&testdir).unwrap();
    1614            6 : 
    1615            6 :         let path = testdir.join("myfile");
    1616            6 :         let tmp_path = testdir.join("myfile.tmp");
    1617            6 : 
    1618            6 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1619            6 :         assert!(tmp_path.exists());
    1620            6 : 
    1621            6 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1622            6 :             .await
    1623            6 :             .unwrap();
    1624            6 : 
    1625            6 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1626            6 :         let post = file.read_string(&ctx).await.unwrap();
    1627            6 :         assert_eq!(post, "foo");
    1628            6 :         assert!(!tmp_path.exists());
    1629            6 :         drop(file);
    1630            6 :     }
    1631              : }
        

Generated by: LCOV version 2.1-beta