LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 94.1 % 988 930
Test Date: 2024-08-21 17:32:46 Functions: 92.0 % 213 196

            Line data    Source code
       1              : //!
       2              : //! VirtualFile is like a normal File, but it's not bound directly to
       3              : //! a file descriptor. Instead, the file is opened when it's read from,
       4              : //! and if too many files are open globally in the system, least-recently
       5              : //! used ones are closed.
       6              : //!
       7              : //! To track which files have been recently used, we use the clock algorithm
       8              : //! with a 'recently_used' flag on each slot.
       9              : //!
      10              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11              : //! src/backend/storage/file/fd.c
      12              : //!
      13              : use crate::context::RequestContext;
      14              : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      15              : 
      16              : use crate::page_cache::{PageWriteGuard, PAGE_SZ};
      17              : use crate::tenant::TENANTS_SEGMENT_NAME;
      18              : use camino::{Utf8Path, Utf8PathBuf};
      19              : use once_cell::sync::OnceCell;
      20              : use owned_buffers_io::io_buf_ext::FullSlice;
      21              : use pageserver_api::shard::TenantShardId;
      22              : use std::fs::File;
      23              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      24              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      25              : 
      26              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      27              : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      28              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      29              : use tokio::time::Instant;
      30              : 
      31              : pub use pageserver_api::models::virtual_file as api;
      32              : pub(crate) mod io_engine;
      33              : pub use io_engine::feature_test as io_engine_feature_test;
      34              : pub use io_engine::io_engine_for_bench;
      35              : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
      36              : mod metadata;
      37              : mod open_options;
      38              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      39              : pub(crate) use api::DirectIoMode;
      40              : pub(crate) use io_engine::IoEngineKind;
      41              : pub(crate) use metadata::Metadata;
      42              : pub(crate) use open_options::*;
      43              : 
      44              : pub(crate) mod owned_buffers_io {
      45              :     //! Abstractions for IO with owned buffers.
      46              :     //!
      47              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      48              :     //! reason we need this abstraction.
      49              :     //!
      50              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      51              :     //! but for the time being we're proving out the primitives in the neon.git repo
      52              :     //! for faster iteration.
      53              : 
      54              :     pub(crate) mod io_buf_ext;
      55              :     pub(crate) mod slice;
      56              :     pub(crate) mod write;
      57              :     pub(crate) mod util {
      58              :         pub(crate) mod size_tracking_writer;
      59              :     }
      60              : }
      61              : 
      62              : ///
      63              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      64              : /// the underlying file is closed if the system is low on file descriptors,
      65              : /// and re-opened when it's accessed again.
      66              : ///
      67              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      68              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      69              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      70              : /// require a mutable reference, because they modify the "current position".
      71              : ///
      72              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      73              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      74              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      75              : /// 'tag' field is used to detect whether the handle still is valid or not.
      76              : ///
      77              : #[derive(Debug)]
      78              : pub struct VirtualFile {
      79              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      80              :     /// might contain our File, or it may be empty, or it may contain a File that
      81              :     /// belongs to a different VirtualFile.
      82              :     handle: RwLock<SlotHandle>,
      83              : 
      84              :     /// Current file position
      85              :     pos: u64,
      86              : 
      87              :     /// File path and options to use to open it.
      88              :     ///
      89              :     /// Note: this only contains the options needed to re-open it. For example,
      90              :     /// if a new file is created, we only pass the create flag when it's initially
      91              :     /// opened, in the VirtualFile::create() function, and strip the flag before
      92              :     /// storing it here.
      93              :     pub path: Utf8PathBuf,
      94              :     open_options: OpenOptions,
      95              : 
      96              :     // These are strings becase we only use them for metrics, and those expect strings.
      97              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      98              :     // strings.
      99              :     tenant_id: String,
     100              :     shard_id: String,
     101              :     timeline_id: String,
     102              : }
     103              : 
     104              : #[derive(Debug, PartialEq, Clone, Copy)]
     105              : struct SlotHandle {
     106              :     /// Index into OPEN_FILES.slots
     107              :     index: usize,
     108              : 
     109              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     110              :     /// been recycled and no longer contains the FD for this virtual file.
     111              :     tag: u64,
     112              : }
     113              : 
     114              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     115              : /// are currently open. Each slot in the array is protected by a separate lock,
     116              : /// so that different files can be accessed independently. The lock must be held
     117              : /// in write mode to replace the slot with a different file, but a read mode
     118              : /// is enough to operate on the file, whether you're reading or writing to it.
     119              : ///
     120              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     121              : /// the virtual_file::init() function. It must be called exactly once at page
     122              : /// server startup.
     123              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     124              : 
     125              : struct OpenFiles {
     126              :     slots: &'static [Slot],
     127              : 
     128              :     /// clock arm for the clock algorithm
     129              :     next: AtomicUsize,
     130              : }
     131              : 
     132              : struct Slot {
     133              :     inner: RwLock<SlotInner>,
     134              : 
     135              :     /// has this file been used since last clock sweep?
     136              :     recently_used: AtomicBool,
     137              : }
     138              : 
     139              : struct SlotInner {
     140              :     /// Counter that's incremented every time a different file is stored here.
     141              :     /// To avoid the ABA problem.
     142              :     tag: u64,
     143              : 
     144              :     /// the underlying file
     145              :     file: Option<OwnedFd>,
     146              : }
     147              : 
     148              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     149              : struct PageWriteGuardBuf {
     150              :     page: PageWriteGuard<'static>,
     151              : }
     152              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     153              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     154              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     155              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     156              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     157       247662 :     fn stable_ptr(&self) -> *const u8 {
     158       247662 :         self.page.as_ptr()
     159       247662 :     }
     160       464283 :     fn bytes_init(&self) -> usize {
     161       464283 :         self.page.len()
     162       464283 :     }
     163       185580 :     fn bytes_total(&self) -> usize {
     164       185580 :         self.page.len()
     165       185580 :     }
     166              : }
     167              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     168              : // hence it's safe to hand out the `stable_mut_ptr()`.
     169              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     170        92901 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     171        92901 :         self.page.as_mut_ptr()
     172        92901 :     }
     173              : 
     174        61860 :     unsafe fn set_init(&mut self, pos: usize) {
     175        61860 :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     176        61860 :         assert!(pos <= self.page.len());
     177        61860 :     }
     178              : }
     179              : 
     180              : impl OpenFiles {
     181              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     182              :     ///
     183              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     184              :     /// recently_used has been set. It's all ready for reuse.
     185       190966 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     186       190966 :         //
     187       190966 :         // Run the clock algorithm to find a slot to replace.
     188       190966 :         //
     189       190966 :         let num_slots = self.slots.len();
     190       190966 :         let mut retries = 0;
     191              :         let mut slot;
     192              :         let mut slot_guard;
     193              :         let index;
     194      2114600 :         loop {
     195      2114600 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     196      2114600 :             slot = &self.slots[next];
     197      2114600 : 
     198      2114600 :             // If the recently_used flag on this slot is set, continue the clock
     199      2114600 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     200      2114600 :             // lock, also continue the clock sweep.
     201      2114600 :             //
     202      2114600 :             // We only continue in this manner for a while, though. If we loop
     203      2114600 :             // through the array twice without finding a victim, just pick the
     204      2114600 :             // next slot and wait until we can reuse it. This way, we avoid
     205      2114600 :             // spinning in the extreme case that all the slots are busy with an
     206      2114600 :             // I/O operation.
     207      2114600 :             if retries < num_slots * 2 {
     208      2039271 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     209      1833261 :                     if let Ok(guard) = slot.inner.try_write() {
     210       115637 :                         slot_guard = guard;
     211       115637 :                         index = next;
     212       115637 :                         break;
     213      1717624 :                     }
     214       206010 :                 }
     215      1923634 :                 retries += 1;
     216              :             } else {
     217        75329 :                 slot_guard = slot.inner.write().await;
     218        75329 :                 index = next;
     219        75329 :                 break;
     220              :             }
     221              :         }
     222              : 
     223              :         //
     224              :         // We now have the victim slot locked. If it was in use previously, close the
     225              :         // old file.
     226              :         //
     227       190966 :         if let Some(old_file) = slot_guard.file.take() {
     228       186439 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     229       186439 :             // distinguish the two.
     230       186439 :             STORAGE_IO_TIME_METRIC
     231       186439 :                 .get(StorageIoOperation::CloseByReplace)
     232       186439 :                 .observe_closure_duration(|| drop(old_file));
     233       186439 :         }
     234              : 
     235              :         // Prepare the slot for reuse and return it
     236       190966 :         slot_guard.tag += 1;
     237       190966 :         slot.recently_used.store(true, Ordering::Relaxed);
     238       190966 :         (
     239       190966 :             SlotHandle {
     240       190966 :                 index,
     241       190966 :                 tag: slot_guard.tag,
     242       190966 :             },
     243       190966 :             slot_guard,
     244       190966 :         )
     245       190966 :     }
     246              : }
     247              : 
     248              : /// Identify error types that should alwways terminate the process.  Other
     249              : /// error types may be elegible for retry.
     250            2 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     251            2 :     use nix::errno::Errno::*;
     252            2 :     match e.raw_os_error().map(nix::errno::from_i32) {
     253              :         Some(EIO) => {
     254              :             // Terminate on EIO because we no longer trust the device to store
     255              :             // data safely, or to uphold persistence guarantees on fsync.
     256            0 :             true
     257              :         }
     258              :         Some(EROFS) => {
     259              :             // Terminate on EROFS because a filesystem is usually remounted
     260              :             // readonly when it has experienced some critical issue, so the same
     261              :             // logic as EIO applies.
     262            0 :             true
     263              :         }
     264              :         Some(EACCES) => {
     265              :             // Terminate on EACCESS because we should always have permissions
     266              :             // for our own data dir: if we don't, then we can't do our job and
     267              :             // need administrative intervention to fix permissions.  Terminating
     268              :             // is the best way to make sure we stop cleanly rather than going
     269              :             // into infinite retry loops, and will make it clear to the outside
     270              :             // world that we need help.
     271            0 :             true
     272              :         }
     273              :         _ => {
     274              :             // Treat all other local file I/O errors are retryable.  This includes:
     275              :             // - ENOSPC: we stay up and wait for eviction to free some space
     276              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     277              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     278            2 :             false
     279              :         }
     280              :     }
     281            2 : }
     282              : 
     283              : /// Call this when the local filesystem gives us an error with an external
     284              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     285              : /// bad storage or bad configuration, and we can't fix that from inside
     286              : /// a running process.
     287            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     288            0 :     tracing::error!("Fatal I/O error: {e}: {context})");
     289            0 :     std::process::abort();
     290              : }
     291              : 
     292              : pub(crate) trait MaybeFatalIo<T> {
     293              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     294              :     fn fatal_err(self, context: &str) -> T;
     295              : }
     296              : 
     297              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     298              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     299              :     ///
     300              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     301              :     /// not on ENOSPC.
     302      1126802 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     303      1126802 :         if let Err(e) = &self {
     304            2 :             if is_fatal_io_error(e) {
     305            0 :                 on_fatal_io_error(e, context);
     306            2 :             }
     307      1126800 :         }
     308      1126802 :         self
     309      1126802 :     }
     310              : 
     311              :     /// Terminate the process on any I/O error.
     312              :     ///
     313              :     /// This is appropriate for reads on files that we know exist: they should always work.
     314         2038 :     fn fatal_err(self, context: &str) -> T {
     315         2038 :         match self {
     316         2038 :             Ok(v) => v,
     317            0 :             Err(e) => {
     318            0 :                 on_fatal_io_error(&e, context);
     319              :             }
     320              :         }
     321         2038 :     }
     322              : }
     323              : 
     324              : /// Observe duration for the given storage I/O operation
     325              : ///
     326              : /// Unlike `observe_closure_duration`, this supports async,
     327              : /// where "support" means that we measure wall clock time.
     328              : macro_rules! observe_duration {
     329              :     ($op:expr, $($body:tt)*) => {{
     330              :         let instant = Instant::now();
     331              :         let result = $($body)*;
     332              :         let elapsed = instant.elapsed().as_secs_f64();
     333              :         STORAGE_IO_TIME_METRIC
     334              :             .get($op)
     335              :             .observe(elapsed);
     336              :         result
     337              :     }}
     338              : }
     339              : 
     340              : macro_rules! with_file {
     341              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     342              :         let $ident = $this.lock_file().await?;
     343              :         observe_duration!($op, $($body)*)
     344              :     }};
     345              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     346              :         let mut $ident = $this.lock_file().await?;
     347              :         observe_duration!($op, $($body)*)
     348              :     }};
     349              : }
     350              : 
     351              : impl VirtualFile {
     352              :     /// Open a file in read-only mode. Like File::open.
     353         2196 :     pub async fn open<P: AsRef<Utf8Path>>(
     354         2196 :         path: P,
     355         2196 :         ctx: &RequestContext,
     356         2196 :     ) -> Result<VirtualFile, std::io::Error> {
     357         2196 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
     358         2196 :     }
     359              : 
     360              :     /// Create a new file for writing. If the file exists, it will be truncated.
     361              :     /// Like File::create.
     362         1449 :     pub async fn create<P: AsRef<Utf8Path>>(
     363         1449 :         path: P,
     364         1449 :         ctx: &RequestContext,
     365         1449 :     ) -> Result<VirtualFile, std::io::Error> {
     366         1449 :         Self::open_with_options(
     367         1449 :             path.as_ref(),
     368         1449 :             OpenOptions::new().write(true).create(true).truncate(true),
     369         1449 :             ctx,
     370         1449 :         )
     371          754 :         .await
     372         1449 :     }
     373              : 
     374              :     /// Open a file with given options.
     375              :     ///
     376              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     377              :     /// they will be applied also when the file is subsequently re-opened, not only
     378              :     /// on the first time. Make sure that's sane!
     379         5617 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     380         5617 :         path: P,
     381         5617 :         open_options: &OpenOptions,
     382         5617 :         _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
     383         5617 :     ) -> Result<VirtualFile, std::io::Error> {
     384         5617 :         let path_ref = path.as_ref();
     385         5617 :         let path_str = path_ref.to_string();
     386         5617 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     387         5617 :         let (tenant_id, shard_id, timeline_id) =
     388         5617 :             if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     389         4123 :                 let tenant_shard_part = parts[parts.len() - 4];
     390         4123 :                 let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
     391         4123 :                     Ok(tenant_shard_id) => (
     392         4123 :                         tenant_shard_id.tenant_id.to_string(),
     393         4123 :                         format!("{}", tenant_shard_id.shard_slug()),
     394         4123 :                     ),
     395              :                     Err(_) => {
     396              :                         // Malformed path: this ID is just for observability, so tolerate it
     397              :                         // and pass through
     398            0 :                         (tenant_shard_part.to_string(), "*".to_string())
     399              :                     }
     400              :                 };
     401         4123 :                 (tenant_id, shard_id, parts[parts.len() - 2].to_string())
     402              :             } else {
     403         1494 :                 ("*".to_string(), "*".to_string(), "*".to_string())
     404              :             };
     405         5617 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     406              : 
     407              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     408              :         // where our caller doesn't get to use the returned VirtualFile before its
     409              :         // slot gets re-used by someone else.
     410         5617 :         let file = observe_duration!(StorageIoOperation::Open, {
     411         5617 :             open_options.open(path_ref.as_std_path()).await?
     412              :         });
     413              : 
     414              :         // Strip all options other than read and write.
     415              :         //
     416              :         // It would perhaps be nicer to check just for the read and write flags
     417              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     418              :         // only to set them.
     419         5617 :         let mut reopen_options = open_options.clone();
     420         5617 :         reopen_options.create(false);
     421         5617 :         reopen_options.create_new(false);
     422         5617 :         reopen_options.truncate(false);
     423         5617 : 
     424         5617 :         let vfile = VirtualFile {
     425         5617 :             handle: RwLock::new(handle),
     426         5617 :             pos: 0,
     427         5617 :             path: path_ref.to_path_buf(),
     428         5617 :             open_options: reopen_options,
     429         5617 :             tenant_id,
     430         5617 :             shard_id,
     431         5617 :             timeline_id,
     432         5617 :         };
     433         5617 : 
     434         5617 :         // TODO: Under pressure, it's likely the slot will get re-used and
     435         5617 :         // the underlying file closed before they get around to using it.
     436         5617 :         // => https://github.com/neondatabase/neon/issues/6065
     437         5617 :         slot_guard.file.replace(file);
     438         5617 : 
     439         5617 :         Ok(vfile)
     440         5617 :     }
     441              : 
     442              :     /// Async version of [`::utils::crashsafe::overwrite`].
     443              :     ///
     444              :     /// # NB:
     445              :     ///
     446              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     447              :     /// it did at an earlier time.
     448              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     449           28 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     450           28 :         final_path: Utf8PathBuf,
     451           28 :         tmp_path: Utf8PathBuf,
     452           28 :         content: B,
     453           28 :     ) -> std::io::Result<()> {
     454           28 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     455           28 :         // See https://github.com/neondatabase/neon/issues/6663
     456           28 : 
     457           28 :         tokio::task::spawn_blocking(move || {
     458           28 :             let slice_storage;
     459           28 :             let content_len = content.bytes_init();
     460           28 :             let content = if content.bytes_init() > 0 {
     461           28 :                 slice_storage = Some(content.slice(0..content_len));
     462           28 :                 slice_storage.as_deref().expect("just set it to Some()")
     463              :             } else {
     464            0 :                 &[]
     465              :             };
     466           28 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     467           28 :         })
     468           28 :         .await
     469           28 :         .expect("blocking task is never aborted")
     470           28 :     }
     471              : 
     472              :     /// Call File::sync_all() on the underlying File.
     473         2687 :     pub async fn sync_all(&self) -> Result<(), Error> {
     474         2687 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     475         2687 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     476         2687 :             res
     477              :         })
     478         2687 :     }
     479              : 
     480              :     /// Call File::sync_data() on the underlying File.
     481            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     482            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     483            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     484            0 :             res
     485              :         })
     486            0 :     }
     487              : 
     488         1684 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     489         1684 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     490         1684 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     491         1684 :             res
     492              :         })
     493         1684 :     }
     494              : 
     495              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     496              :     /// opens it and evicts some other File if necessary. The passed parameter is
     497              :     /// assumed to be a function available for the physical `File`.
     498              :     ///
     499              :     /// We are doing it via a macro as Rust doesn't support async closures that
     500              :     /// take on parameters with lifetimes.
     501      1627744 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     502      1627744 :         let open_files = get_open_files();
     503              : 
     504       185349 :         let mut handle_guard = {
     505              :             // Read the cached slot handle, and see if the slot that it points to still
     506              :             // contains our File.
     507              :             //
     508              :             // We only need to hold the handle lock while we read the current handle. If
     509              :             // another thread closes the file and recycles the slot for a different file,
     510              :             // we will notice that the handle we read is no longer valid and retry.
     511      1627744 :             let mut handle = *self.handle.read().await;
     512      1721261 :             loop {
     513      1721261 :                 // Check if the slot contains our File
     514      1721261 :                 {
     515      1721261 :                     let slot = &open_files.slots[handle.index];
     516      1721261 :                     let slot_guard = slot.inner.read().await;
     517      1721261 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     518              :                         // Found a cached file descriptor.
     519      1442395 :                         slot.recently_used.store(true, Ordering::Relaxed);
     520      1442395 :                         return Ok(FileGuard { slot_guard });
     521       278866 :                     }
     522              :                 }
     523              : 
     524              :                 // The slot didn't contain our File. We will have to open it ourselves,
     525              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     526              :                 // that no other thread will try to concurrently open the same file.
     527       278866 :                 let handle_guard = self.handle.write().await;
     528              : 
     529              :                 // If another thread changed the handle while we were not holding the lock,
     530              :                 // then the handle might now be valid again. Loop back to retry.
     531       278866 :                 if *handle_guard != handle {
     532        93517 :                     handle = *handle_guard;
     533        93517 :                     continue;
     534       185349 :                 }
     535       185349 :                 break handle_guard;
     536              :             }
     537              :         };
     538              : 
     539              :         // We need to open the file ourselves. The handle in the VirtualFile is
     540              :         // now locked in write-mode. Find a free slot to put it in.
     541       185349 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     542              : 
     543              :         // Re-open the physical file.
     544              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     545              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     546              :         // of the virtual file descriptor cache.
     547       185349 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     548       185349 :             self.open_options.open(self.path.as_std_path()).await?
     549              :         });
     550              : 
     551              :         // Store the File in the slot and update the handle in the VirtualFile
     552              :         // to point to it.
     553       185349 :         slot_guard.file.replace(file);
     554       185349 : 
     555       185349 :         *handle_guard = handle;
     556       185349 : 
     557       185349 :         return Ok(FileGuard {
     558       185349 :             slot_guard: slot_guard.downgrade(),
     559       185349 :         });
     560      1627744 :     }
     561              : 
     562           22 :     pub fn remove(self) {
     563           22 :         let path = self.path.clone();
     564           22 :         drop(self);
     565           22 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     566           22 :     }
     567              : 
     568         5086 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     569         5086 :         match pos {
     570         5076 :             SeekFrom::Start(offset) => {
     571         5076 :                 self.pos = offset;
     572         5076 :             }
     573            4 :             SeekFrom::End(offset) => {
     574            4 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     575            4 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     576              :             }
     577            6 :             SeekFrom::Current(offset) => {
     578            6 :                 let pos = self.pos as i128 + offset as i128;
     579            6 :                 if pos < 0 {
     580            2 :                     return Err(Error::new(
     581            2 :                         ErrorKind::InvalidInput,
     582            2 :                         "offset would be negative",
     583            2 :                     ));
     584            4 :                 }
     585            4 :                 if pos > u64::MAX as i128 {
     586            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     587            4 :                 }
     588            4 :                 self.pos = pos as u64;
     589              :             }
     590              :         }
     591         5082 :         Ok(self.pos)
     592         5086 :     }
     593              : 
     594              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     595              :     ///
     596              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     597       496359 :     pub async fn read_exact_at<Buf>(
     598       496359 :         &self,
     599       496359 :         slice: Slice<Buf>,
     600       496359 :         offset: u64,
     601       496359 :         ctx: &RequestContext,
     602       496359 :     ) -> Result<Slice<Buf>, Error>
     603       496359 :     where
     604       496359 :         Buf: IoBufMut + Send,
     605       496359 :     {
     606       496359 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     607       496359 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     608              :         } else {
     609            0 :             None
     610              :         };
     611              : 
     612       496359 :         let original_bounds = slice.bounds();
     613       496359 :         let (buf, res) =
     614       715185 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     615       496359 :         let res = res.map(|_| buf.slice(original_bounds));
     616              : 
     617       496359 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     618       496359 :             if let Ok(slice) = &res {
     619       496359 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     620       496359 :                 assert_eq!(original_bounds, returned_bounds);
     621            0 :             }
     622            0 :         }
     623              : 
     624       496359 :         res
     625       496359 :     }
     626              : 
     627              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     628        61860 :     pub async fn read_exact_at_page(
     629        61860 :         &self,
     630        61860 :         page: PageWriteGuard<'static>,
     631        61860 :         offset: u64,
     632        61860 :         ctx: &RequestContext,
     633        61860 :     ) -> Result<PageWriteGuard<'static>, Error> {
     634        61860 :         let buf = PageWriteGuardBuf { page }.slice_full();
     635        61860 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     636        61860 :         self.read_exact_at(buf, offset, ctx)
     637        37243 :             .await
     638        61860 :             .map(|slice| slice.into_inner().page)
     639        61860 :     }
     640              : 
     641              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     642            4 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     643            4 :         &self,
     644            4 :         buf: FullSlice<Buf>,
     645            4 :         mut offset: u64,
     646            4 :         ctx: &RequestContext,
     647            4 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     648            4 :         let buf = buf.into_raw_slice();
     649            4 :         let bounds = buf.bounds();
     650            4 :         let restore =
     651            4 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     652            4 :         let mut buf = buf;
     653            8 :         while !buf.is_empty() {
     654            4 :             let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
     655            4 :             buf = tmp.into_raw_slice();
     656            0 :             match res {
     657              :                 Ok(0) => {
     658            0 :                     return (
     659            0 :                         restore(buf),
     660            0 :                         Err(Error::new(
     661            0 :                             std::io::ErrorKind::WriteZero,
     662            0 :                             "failed to write whole buffer",
     663            0 :                         )),
     664            0 :                     );
     665              :                 }
     666            4 :                 Ok(n) => {
     667            4 :                     buf = buf.slice(n..);
     668            4 :                     offset += n as u64;
     669            4 :                 }
     670            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     671            0 :                 Err(e) => return (restore(buf), Err(e)),
     672              :             }
     673              :         }
     674            4 :         (restore(buf), Ok(()))
     675            4 :     }
     676              : 
     677              :     /// Writes `buf` to the file at the current offset.
     678              :     ///
     679              :     /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
     680      1126814 :     pub async fn write_all<Buf: IoBuf + Send>(
     681      1126814 :         &mut self,
     682      1126814 :         buf: FullSlice<Buf>,
     683      1126814 :         ctx: &RequestContext,
     684      1126814 :     ) -> (FullSlice<Buf>, Result<usize, Error>) {
     685      1126814 :         let buf = buf.into_raw_slice();
     686      1126814 :         let bounds = buf.bounds();
     687      1126814 :         let restore =
     688      1126814 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     689      1126814 :         let nbytes = buf.len();
     690      1126814 :         let mut buf = buf;
     691      2253588 :         while !buf.is_empty() {
     692      1126776 :             let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
     693      1126776 :             buf = tmp.into_raw_slice();
     694            2 :             match res {
     695              :                 Ok(0) => {
     696            0 :                     return (
     697            0 :                         restore(buf),
     698            0 :                         Err(Error::new(
     699            0 :                             std::io::ErrorKind::WriteZero,
     700            0 :                             "failed to write whole buffer",
     701            0 :                         )),
     702            0 :                     );
     703              :                 }
     704      1126774 :                 Ok(n) => {
     705      1126774 :                     buf = buf.slice(n..);
     706      1126774 :                 }
     707            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     708            2 :                 Err(e) => return (restore(buf), Err(e)),
     709              :             }
     710              :         }
     711      1126812 :         (restore(buf), Ok(nbytes))
     712      1126814 :     }
     713              : 
     714      1126776 :     async fn write<B: IoBuf + Send>(
     715      1126776 :         &mut self,
     716      1126776 :         buf: FullSlice<B>,
     717      1126776 :         ctx: &RequestContext,
     718      1126776 :     ) -> (FullSlice<B>, Result<usize, std::io::Error>) {
     719      1126776 :         let pos = self.pos;
     720      1126776 :         let (buf, res) = self.write_at(buf, pos, ctx).await;
     721      1126776 :         let n = match res {
     722      1126774 :             Ok(n) => n,
     723            2 :             Err(e) => return (buf, Err(e)),
     724              :         };
     725      1126774 :         self.pos += n as u64;
     726      1126774 :         (buf, Ok(n))
     727      1126776 :     }
     728              : 
     729       496589 :     pub(crate) async fn read_at<Buf>(
     730       496589 :         &self,
     731       496589 :         buf: tokio_epoll_uring::Slice<Buf>,
     732       496589 :         offset: u64,
     733       496589 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     734       496589 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     735       496589 :     where
     736       496589 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     737       496589 :     {
     738       496589 :         let file_guard = match self.lock_file().await {
     739       496589 :             Ok(file_guard) => file_guard,
     740            0 :             Err(e) => return (buf, Err(e)),
     741              :         };
     742              : 
     743       496589 :         observe_duration!(StorageIoOperation::Read, {
     744       496589 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     745       496589 :             if let Ok(size) = res {
     746       496587 :                 STORAGE_IO_SIZE
     747       496587 :                     .with_label_values(&[
     748       496587 :                         "read",
     749       496587 :                         &self.tenant_id,
     750       496587 :                         &self.shard_id,
     751       496587 :                         &self.timeline_id,
     752       496587 :                     ])
     753       496587 :                     .add(size as i64);
     754       496587 :             }
     755       496589 :             (buf, res)
     756              :         })
     757       496589 :     }
     758              : 
     759              :     /// The function aborts the process if the error is fatal.
     760      1126780 :     async fn write_at<B: IoBuf + Send>(
     761      1126780 :         &self,
     762      1126780 :         buf: FullSlice<B>,
     763      1126780 :         offset: u64,
     764      1126780 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     765      1126780 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     766      1126780 :         let (slice, result) = self.write_at_inner(buf, offset, _ctx).await;
     767      1126780 :         let result = result.maybe_fatal_err("write_at");
     768      1126780 :         (slice, result)
     769      1126780 :     }
     770              : 
     771      1126780 :     async fn write_at_inner<B: IoBuf + Send>(
     772      1126780 :         &self,
     773      1126780 :         buf: FullSlice<B>,
     774      1126780 :         offset: u64,
     775      1126780 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     776      1126780 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     777      1126780 :         let file_guard = match self.lock_file().await {
     778      1126780 :             Ok(file_guard) => file_guard,
     779            0 :             Err(e) => return (buf, Err(e)),
     780              :         };
     781      1126780 :         observe_duration!(StorageIoOperation::Write, {
     782      1126780 :             let ((_file_guard, buf), result) =
     783      1126780 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     784      1126780 :             if let Ok(size) = result {
     785      1126778 :                 STORAGE_IO_SIZE
     786      1126778 :                     .with_label_values(&[
     787      1126778 :                         "write",
     788      1126778 :                         &self.tenant_id,
     789      1126778 :                         &self.shard_id,
     790      1126778 :                         &self.timeline_id,
     791      1126778 :                     ])
     792      1126778 :                     .add(size as i64);
     793      1126778 :             }
     794      1126780 :             (buf, result)
     795              :         })
     796      1126780 :     }
     797              : }
     798              : 
     799              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     800       496367 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     801       496367 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     802       496367 :     mut offset: u64,
     803       496367 :     mut read_at: F,
     804       496367 : ) -> (Buf, std::io::Result<()>)
     805       496367 : where
     806       496367 :     Buf: IoBufMut + Send,
     807       496367 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
     808       496367 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
     809       496367 : {
     810       992522 :     while buf.bytes_total() != 0 {
     811              :         let res;
     812       715185 :         (buf, res) = read_at(buf, offset).await;
     813            0 :         match res {
     814            2 :             Ok(0) => break,
     815       496155 :             Ok(n) => {
     816       496155 :                 buf = buf.slice(n..);
     817       496155 :                 offset += n as u64;
     818       496155 :             }
     819            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     820            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     821              :         }
     822              :     }
     823              :     // NB: don't use `buf.is_empty()` here; it is from the
     824              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     825              :     // returned by it only covers the initialized portion of `buf`.
     826              :     // Whereas we're interested in ensuring that we filled the entire
     827              :     // buffer that the user passed in.
     828       496367 :     if buf.bytes_total() != 0 {
     829            2 :         (
     830            2 :             buf.into_inner(),
     831            2 :             Err(std::io::Error::new(
     832            2 :                 std::io::ErrorKind::UnexpectedEof,
     833            2 :                 "failed to fill whole buffer",
     834            2 :             )),
     835            2 :         )
     836              :     } else {
     837       496365 :         assert_eq!(buf.len(), buf.bytes_total());
     838       496365 :         (buf.into_inner(), Ok(()))
     839              :     }
     840       496367 : }
     841              : 
     842              : #[cfg(test)]
     843              : mod test_read_exact_at_impl {
     844              : 
     845              :     use std::{collections::VecDeque, sync::Arc};
     846              : 
     847              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     848              : 
     849              :     use super::read_exact_at_impl;
     850              : 
     851              :     struct Expectation {
     852              :         offset: u64,
     853              :         bytes_total: usize,
     854              :         result: std::io::Result<Vec<u8>>,
     855              :     }
     856              :     struct MockReadAt {
     857              :         expectations: VecDeque<Expectation>,
     858              :     }
     859              : 
     860              :     impl MockReadAt {
     861           12 :         async fn read_at(
     862           12 :             &mut self,
     863           12 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     864           12 :             offset: u64,
     865           12 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     866           12 :             let exp = self
     867           12 :                 .expectations
     868           12 :                 .pop_front()
     869           12 :                 .expect("read_at called but we have no expectations left");
     870           12 :             assert_eq!(exp.offset, offset);
     871           12 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     872           12 :             match exp.result {
     873           12 :                 Ok(bytes) => {
     874           12 :                     assert!(bytes.len() <= buf.bytes_total());
     875           12 :                     buf.put_slice(&bytes);
     876           12 :                     (buf, Ok(bytes.len()))
     877              :                 }
     878            0 :                 Err(e) => (buf, Err(e)),
     879              :             }
     880           12 :         }
     881              :     }
     882              : 
     883              :     impl Drop for MockReadAt {
     884            8 :         fn drop(&mut self) {
     885            8 :             assert_eq!(self.expectations.len(), 0);
     886            8 :         }
     887              :     }
     888              : 
     889              :     #[tokio::test]
     890            2 :     async fn test_basic() {
     891            2 :         let buf = Vec::with_capacity(5).slice_full();
     892            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     893            2 :             expectations: VecDeque::from(vec![Expectation {
     894            2 :                 offset: 0,
     895            2 :                 bytes_total: 5,
     896            2 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     897            2 :             }]),
     898            2 :         }));
     899            2 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     900            2 :             let mock_read_at = Arc::clone(&mock_read_at);
     901            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     902            2 :         })
     903            2 :         .await;
     904            2 :         assert!(res.is_ok());
     905            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
     906            2 :     }
     907              : 
     908              :     #[tokio::test]
     909            2 :     async fn test_empty_buf_issues_no_syscall() {
     910            2 :         let buf = Vec::new().slice_full();
     911            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     912            2 :             expectations: VecDeque::new(),
     913            2 :         }));
     914            2 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     915            0 :             let mock_read_at = Arc::clone(&mock_read_at);
     916            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     917            2 :         })
     918            2 :         .await;
     919            2 :         assert!(res.is_ok());
     920            2 :     }
     921              : 
     922              :     #[tokio::test]
     923            2 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
     924            2 :         let buf = Vec::with_capacity(4).slice_full();
     925            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     926            2 :             expectations: VecDeque::from(vec![
     927            2 :                 Expectation {
     928            2 :                     offset: 0,
     929            2 :                     bytes_total: 4,
     930            2 :                     result: Ok(vec![b'a', b'b']),
     931            2 :                 },
     932            2 :                 Expectation {
     933            2 :                     offset: 2,
     934            2 :                     bytes_total: 2,
     935            2 :                     result: Ok(vec![b'c', b'd']),
     936            2 :                 },
     937            2 :             ]),
     938            2 :         }));
     939            4 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     940            4 :             let mock_read_at = Arc::clone(&mock_read_at);
     941            4 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     942            4 :         })
     943            2 :         .await;
     944            2 :         assert!(res.is_ok());
     945            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
     946            2 :     }
     947              : 
     948              :     #[tokio::test]
     949            2 :     async fn test_eof_before_buffer_full() {
     950            2 :         let buf = Vec::with_capacity(3).slice_full();
     951            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     952            2 :             expectations: VecDeque::from(vec![
     953            2 :                 Expectation {
     954            2 :                     offset: 0,
     955            2 :                     bytes_total: 3,
     956            2 :                     result: Ok(vec![b'a']),
     957            2 :                 },
     958            2 :                 Expectation {
     959            2 :                     offset: 1,
     960            2 :                     bytes_total: 2,
     961            2 :                     result: Ok(vec![b'b']),
     962            2 :                 },
     963            2 :                 Expectation {
     964            2 :                     offset: 2,
     965            2 :                     bytes_total: 1,
     966            2 :                     result: Ok(vec![]),
     967            2 :                 },
     968            2 :             ]),
     969            2 :         }));
     970            6 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     971            6 :             let mock_read_at = Arc::clone(&mock_read_at);
     972            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     973            6 :         })
     974            2 :         .await;
     975            2 :         let Err(err) = res else {
     976            2 :             panic!("should return an error");
     977            2 :         };
     978            2 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
     979            2 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
     980            2 :         // buffer contents on error are unspecified
     981            2 :     }
     982              : }
     983              : 
     984              : struct FileGuard {
     985              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
     986              : }
     987              : 
     988              : impl AsRef<OwnedFd> for FileGuard {
     989      1627744 :     fn as_ref(&self) -> &OwnedFd {
     990      1627744 :         // This unwrap is safe because we only create `FileGuard`s
     991      1627744 :         // if we know that the file is Some.
     992      1627744 :         self.slot_guard.file.as_ref().unwrap()
     993      1627744 :     }
     994              : }
     995              : 
     996              : impl FileGuard {
     997              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     998       813925 :     fn with_std_file<F, R>(&self, with: F) -> R
     999       813925 :     where
    1000       813925 :         F: FnOnce(&File) -> R,
    1001       813925 :     {
    1002       813925 :         // SAFETY:
    1003       813925 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1004       813925 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
    1005       813925 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1006       813925 :         let res = with(&file);
    1007       813925 :         let _ = file.into_raw_fd();
    1008       813925 :         res
    1009       813925 :     }
    1010              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
    1011            4 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
    1012            4 :     where
    1013            4 :         F: FnOnce(&mut File) -> R,
    1014            4 :     {
    1015            4 :         // SAFETY:
    1016            4 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1017            4 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
    1018            4 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1019            4 :         let res = with(&mut file);
    1020            4 :         let _ = file.into_raw_fd();
    1021            4 :         res
    1022            4 :     }
    1023              : }
    1024              : 
    1025              : impl tokio_epoll_uring::IoFd for FileGuard {
    1026       813815 :     unsafe fn as_fd(&self) -> RawFd {
    1027       813815 :         let owned_fd: &OwnedFd = self.as_ref();
    1028       813815 :         owned_fd.as_raw_fd()
    1029       813815 :     }
    1030              : }
    1031              : 
    1032              : #[cfg(test)]
    1033              : impl VirtualFile {
    1034        20916 :     pub(crate) async fn read_blk(
    1035        20916 :         &self,
    1036        20916 :         blknum: u32,
    1037        20916 :         ctx: &RequestContext,
    1038        20916 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1039        20916 :         use crate::page_cache::PAGE_SZ;
    1040        20916 :         let slice = Vec::with_capacity(PAGE_SZ).slice_full();
    1041        20916 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1042        20916 :         let slice = self
    1043        20916 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1044        10619 :             .await?;
    1045        20916 :         Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
    1046        20916 :     }
    1047              : 
    1048          224 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
    1049          224 :         let mut tmp = vec![0; 128];
    1050              :         loop {
    1051          444 :             let slice = tmp.slice(..128);
    1052          444 :             let (slice, res) = self.read_at(slice, self.pos, ctx).await;
    1053            2 :             match res {
    1054          222 :                 Ok(0) => return Ok(()),
    1055          220 :                 Ok(n) => {
    1056          220 :                     self.pos += n as u64;
    1057          220 :                     buf.extend_from_slice(&slice[..n]);
    1058          220 :                 }
    1059            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1060            2 :                 Err(e) => return Err(e),
    1061              :             }
    1062          220 :             tmp = slice.into_inner();
    1063              :         }
    1064          224 :     }
    1065              : }
    1066              : 
    1067              : impl Drop for VirtualFile {
    1068              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1069         4827 :     fn drop(&mut self) {
    1070         4827 :         let handle = self.handle.get_mut();
    1071         4827 : 
    1072         4827 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1073         4827 :             if slot_guard.tag == tag {
    1074         4827 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1075         4827 :                 // there is also operation "close-by-replace" for closes done on eviction for
    1076         4827 :                 // comparison.
    1077         4827 :                 if let Some(fd) = slot_guard.file.take() {
    1078         4276 :                     STORAGE_IO_TIME_METRIC
    1079         4276 :                         .get(StorageIoOperation::Close)
    1080         4276 :                         .observe_closure_duration(|| drop(fd));
    1081         4276 :                 }
    1082         4827 :             }
    1083         4827 :         }
    1084         4827 : 
    1085         4827 :         // We don't have async drop so we cannot directly await the lock here.
    1086         4827 :         // Instead, first do a best-effort attempt at closing the underlying
    1087         4827 :         // file descriptor by using `try_write`, and if that fails, spawn
    1088         4827 :         // a tokio task to do it asynchronously: we just want it to be
    1089         4827 :         // cleaned up eventually.
    1090         4827 :         // Most of the time, the `try_lock` should succeed though,
    1091         4827 :         // as we have `&mut self` access. In other words, if the slot
    1092         4827 :         // is still occupied by our file, there should be no access from
    1093         4827 :         // other I/O operations; the only other possible place to lock
    1094         4827 :         // the slot is the lock algorithm looking for free slots.
    1095         4827 :         let slot = &get_open_files().slots[handle.index];
    1096         4827 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1097         4827 :             clean_slot(slot, slot_guard, handle.tag);
    1098         4827 :         } else {
    1099            0 :             let tag = handle.tag;
    1100            0 :             tokio::spawn(async move {
    1101            0 :                 let slot_guard = slot.inner.write().await;
    1102            0 :                 clean_slot(slot, slot_guard, tag);
    1103            0 :             });
    1104            0 :         };
    1105         4827 :     }
    1106              : }
    1107              : 
    1108              : impl OwnedAsyncWriter for VirtualFile {
    1109              :     #[inline(always)]
    1110         6613 :     async fn write_all<Buf: IoBuf + Send>(
    1111         6613 :         &mut self,
    1112         6613 :         buf: FullSlice<Buf>,
    1113         6613 :         ctx: &RequestContext,
    1114         6613 :     ) -> std::io::Result<(usize, FullSlice<Buf>)> {
    1115         6613 :         let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
    1116         6613 :         res.map(move |v| (v, buf))
    1117         6613 :     }
    1118              : }
    1119              : 
    1120              : impl OpenFiles {
    1121          188 :     fn new(num_slots: usize) -> OpenFiles {
    1122          188 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1123         1880 :         for _ in 0..num_slots {
    1124         1880 :             let slot = Slot {
    1125         1880 :                 recently_used: AtomicBool::new(false),
    1126         1880 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1127         1880 :             };
    1128         1880 :             slots.push(slot);
    1129         1880 :         }
    1130              : 
    1131          188 :         OpenFiles {
    1132          188 :             next: AtomicUsize::new(0),
    1133          188 :             slots: Box::leak(slots),
    1134          188 :         }
    1135          188 :     }
    1136              : }
    1137              : 
    1138              : ///
    1139              : /// Initialize the virtual file module. This must be called once at page
    1140              : /// server startup.
    1141              : ///
    1142              : #[cfg(not(test))]
    1143            0 : pub fn init(num_slots: usize, engine: IoEngineKind) {
    1144            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1145            0 :         panic!("virtual_file::init called twice");
    1146            0 :     }
    1147            0 :     io_engine::init(engine);
    1148            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1149            0 : }
    1150              : 
    1151              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1152              : 
    1153              : // Get a handle to the global slots array.
    1154      1638188 : fn get_open_files() -> &'static OpenFiles {
    1155      1638188 :     //
    1156      1638188 :     // In unit tests, page server startup doesn't happen and no one calls
    1157      1638188 :     // virtual_file::init(). Initialize it here, with a small array.
    1158      1638188 :     //
    1159      1638188 :     // This applies to the virtual file tests below, but all other unit
    1160      1638188 :     // tests too, so the virtual file facility is always usable in
    1161      1638188 :     // unit tests.
    1162      1638188 :     //
    1163      1638188 :     if cfg!(test) {
    1164      1638188 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1165              :     } else {
    1166            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1167              :     }
    1168      1638188 : }
    1169              : 
    1170              : #[cfg(test)]
    1171              : mod tests {
    1172              :     use crate::context::DownloadBehavior;
    1173              :     use crate::task_mgr::TaskKind;
    1174              : 
    1175              :     use super::*;
    1176              :     use owned_buffers_io::io_buf_ext::IoBufExt;
    1177              :     use owned_buffers_io::slice::SliceMutExt;
    1178              :     use rand::seq::SliceRandom;
    1179              :     use rand::thread_rng;
    1180              :     use rand::Rng;
    1181              :     use std::io::Write;
    1182              :     use std::os::unix::fs::FileExt;
    1183              :     use std::sync::Arc;
    1184              : 
    1185              :     enum MaybeVirtualFile {
    1186              :         VirtualFile(VirtualFile),
    1187              :         File(File),
    1188              :     }
    1189              : 
    1190              :     impl From<VirtualFile> for MaybeVirtualFile {
    1191            6 :         fn from(vf: VirtualFile) -> Self {
    1192            6 :             MaybeVirtualFile::VirtualFile(vf)
    1193            6 :         }
    1194              :     }
    1195              : 
    1196              :     impl MaybeVirtualFile {
    1197          404 :         async fn read_exact_at(
    1198          404 :             &self,
    1199          404 :             mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
    1200          404 :             offset: u64,
    1201          404 :             ctx: &RequestContext,
    1202          404 :         ) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
    1203          404 :             match self {
    1204          202 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
    1205          202 :                 MaybeVirtualFile::File(file) => {
    1206          202 :                     let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
    1207          202 :                     file.read_exact_at(rust_slice, offset).map(|()| slice)
    1208              :                 }
    1209              :             }
    1210          404 :         }
    1211            8 :         async fn write_all_at<Buf: IoBuf + Send>(
    1212            8 :             &self,
    1213            8 :             buf: FullSlice<Buf>,
    1214            8 :             offset: u64,
    1215            8 :             ctx: &RequestContext,
    1216            8 :         ) -> Result<(), Error> {
    1217            8 :             match self {
    1218            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1219            4 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1220            4 :                     res
    1221              :                 }
    1222            4 :                 MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
    1223              :             }
    1224            8 :         }
    1225           36 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1226           36 :             match self {
    1227           18 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1228           18 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1229              :             }
    1230           36 :         }
    1231            8 :         async fn write_all<Buf: IoBuf + Send>(
    1232            8 :             &mut self,
    1233            8 :             buf: FullSlice<Buf>,
    1234            8 :             ctx: &RequestContext,
    1235            8 :         ) -> Result<(), Error> {
    1236            8 :             match self {
    1237            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1238            4 :                     let (_buf, res) = file.write_all(buf, ctx).await;
    1239            4 :                     res.map(|_| ())
    1240              :                 }
    1241            4 :                 MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
    1242              :             }
    1243            8 :         }
    1244              : 
    1245              :         // Helper function to slurp contents of a file, starting at the current position,
    1246              :         // into a string
    1247          442 :         async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
    1248          442 :             use std::io::Read;
    1249          442 :             let mut buf = String::new();
    1250          442 :             match self {
    1251          224 :                 MaybeVirtualFile::VirtualFile(file) => {
    1252          224 :                     let mut buf = Vec::new();
    1253          226 :                     file.read_to_end(&mut buf, ctx).await?;
    1254          222 :                     return Ok(String::from_utf8(buf).unwrap());
    1255              :                 }
    1256          218 :                 MaybeVirtualFile::File(file) => {
    1257          218 :                     file.read_to_string(&mut buf)?;
    1258              :                 }
    1259              :             }
    1260          216 :             Ok(buf)
    1261          442 :         }
    1262              : 
    1263              :         // Helper function to slurp a portion of a file into a string
    1264          404 :         async fn read_string_at(
    1265          404 :             &mut self,
    1266          404 :             pos: u64,
    1267          404 :             len: usize,
    1268          404 :             ctx: &RequestContext,
    1269          404 :         ) -> Result<String, Error> {
    1270          404 :             let slice = Vec::with_capacity(len).slice_full();
    1271          404 :             assert_eq!(slice.bytes_total(), len);
    1272          404 :             let slice = self.read_exact_at(slice, pos, ctx).await?;
    1273          404 :             let vec = slice.into_inner();
    1274          404 :             assert_eq!(vec.len(), len);
    1275          404 :             Ok(String::from_utf8(vec).unwrap())
    1276          404 :         }
    1277              :     }
    1278              : 
    1279              :     #[tokio::test]
    1280            2 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1281            2 :         // The real work is done in the test_files() helper function. This
    1282            2 :         // allows us to run the same set of tests against a native File, and
    1283            2 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1284            2 :         // but this allows us to verify that the operations return the same
    1285            2 :         // results with VirtualFiles as with native Files. (Except that with
    1286            2 :         // native files, you will run out of file descriptors if the ulimit
    1287            2 :         // is low enough.)
    1288            2 :         struct A;
    1289            2 : 
    1290            2 :         impl Adapter for A {
    1291          206 :             async fn open(
    1292          206 :                 path: Utf8PathBuf,
    1293          206 :                 opts: OpenOptions,
    1294          206 :                 ctx: &RequestContext,
    1295          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1296          206 :                 let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
    1297          206 :                 Ok(MaybeVirtualFile::VirtualFile(vf))
    1298          206 :             }
    1299            2 :         }
    1300          530 :         test_files::<A>("virtual_files").await
    1301            2 :     }
    1302              : 
    1303              :     #[tokio::test]
    1304            2 :     async fn test_physical_files() -> anyhow::Result<()> {
    1305            2 :         struct B;
    1306            2 : 
    1307            2 :         impl Adapter for B {
    1308          206 :             async fn open(
    1309          206 :                 path: Utf8PathBuf,
    1310          206 :                 opts: OpenOptions,
    1311          206 :                 _ctx: &RequestContext,
    1312          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1313          206 :                 Ok(MaybeVirtualFile::File({
    1314          206 :                     let owned_fd = opts.open(path.as_std_path()).await?;
    1315          206 :                     File::from(owned_fd)
    1316            2 :                 }))
    1317          206 :             }
    1318            2 :         }
    1319            2 : 
    1320          104 :         test_files::<B>("physical_files").await
    1321            2 :     }
    1322              : 
    1323              :     /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
    1324              :     /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
    1325              :     /// in trait which benefits from the new lifetime capture rules already.
    1326              :     trait Adapter {
    1327              :         async fn open(
    1328              :             path: Utf8PathBuf,
    1329              :             opts: OpenOptions,
    1330              :             ctx: &RequestContext,
    1331              :         ) -> Result<MaybeVirtualFile, anyhow::Error>;
    1332              :     }
    1333              : 
    1334            4 :     async fn test_files<A>(testname: &str) -> anyhow::Result<()>
    1335            4 :     where
    1336            4 :         A: Adapter,
    1337            4 :     {
    1338            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1339            4 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1340            4 :         std::fs::create_dir_all(&testdir)?;
    1341              : 
    1342            4 :         let path_a = testdir.join("file_a");
    1343            4 :         let mut file_a = A::open(
    1344            4 :             path_a.clone(),
    1345            4 :             OpenOptions::new()
    1346            4 :                 .write(true)
    1347            4 :                 .create(true)
    1348            4 :                 .truncate(true)
    1349            4 :                 .to_owned(),
    1350            4 :             &ctx,
    1351            4 :         )
    1352            4 :         .await?;
    1353            4 :         file_a
    1354            4 :             .write_all(b"foobar".to_vec().slice_len(), &ctx)
    1355            1 :             .await?;
    1356              : 
    1357              :         // cannot read from a file opened in write-only mode
    1358            4 :         let _ = file_a.read_string(&ctx).await.unwrap_err();
    1359              : 
    1360              :         // Close the file and re-open for reading
    1361            4 :         let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
    1362              : 
    1363              :         // cannot write to a file opened in read-only mode
    1364            4 :         let _ = file_a
    1365            4 :             .write_all(b"bar".to_vec().slice_len(), &ctx)
    1366            1 :             .await
    1367            4 :             .unwrap_err();
    1368            4 : 
    1369            4 :         // Try simple read
    1370            4 :         assert_eq!("foobar", file_a.read_string(&ctx).await?);
    1371              : 
    1372              :         // It's positioned at the EOF now.
    1373            4 :         assert_eq!("", file_a.read_string(&ctx).await?);
    1374              : 
    1375              :         // Test seeks.
    1376            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1377            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1378              : 
    1379            4 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1380            4 :         assert_eq!("ar", file_a.read_string(&ctx).await?);
    1381              : 
    1382            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1383            4 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1384            4 :         assert_eq!("bar", file_a.read_string(&ctx).await?);
    1385              : 
    1386            4 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1387            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1388              : 
    1389              :         // Test erroneous seeks to before byte 0
    1390            4 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1391            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1392            4 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1393            4 : 
    1394            4 :         // the erroneous seek should have left the position unchanged
    1395            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1396              : 
    1397              :         // Create another test file, and try FileExt functions on it.
    1398            4 :         let path_b = testdir.join("file_b");
    1399            4 :         let mut file_b = A::open(
    1400            4 :             path_b.clone(),
    1401            4 :             OpenOptions::new()
    1402            4 :                 .read(true)
    1403            4 :                 .write(true)
    1404            4 :                 .create(true)
    1405            4 :                 .truncate(true)
    1406            4 :                 .to_owned(),
    1407            4 :             &ctx,
    1408            4 :         )
    1409            2 :         .await?;
    1410            4 :         file_b
    1411            4 :             .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
    1412            1 :             .await?;
    1413            4 :         file_b
    1414            4 :             .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
    1415            1 :             .await?;
    1416              : 
    1417            4 :         assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
    1418              : 
    1419              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1420              :         // open the same file many times. The effect is the same.)
    1421              :         //
    1422              :         // leave file_a positioned at offset 1 before we start
    1423            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1424              : 
    1425            4 :         let mut vfiles = Vec::new();
    1426          404 :         for _ in 0..100 {
    1427          400 :             let mut vfile = A::open(
    1428          400 :                 path_b.clone(),
    1429          400 :                 OpenOptions::new().read(true).to_owned(),
    1430          400 :                 &ctx,
    1431          400 :             )
    1432          200 :             .await?;
    1433          400 :             assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
    1434          400 :             vfiles.push(vfile);
    1435              :         }
    1436              : 
    1437              :         // make sure we opened enough files to definitely cause evictions.
    1438            4 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1439              : 
    1440              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1441              :         // from it again. We left the file positioned at offset 1 above.
    1442            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1443              : 
    1444              :         // Check that all the other FDs still work too. Use them in random order for
    1445              :         // good measure.
    1446            4 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1447          400 :         for vfile in vfiles.iter_mut() {
    1448          400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
    1449              :         }
    1450              : 
    1451            4 :         Ok(())
    1452            4 :     }
    1453              : 
    1454              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1455              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1456              :     /// VirtualFile from multiple threads concurrently.
    1457              :     #[tokio::test]
    1458            2 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1459            2 :         const SIZE: usize = 8 * 1024;
    1460            2 :         const VIRTUAL_FILES: usize = 100;
    1461            2 :         const THREADS: usize = 100;
    1462            2 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1463            2 : 
    1464            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1465            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1466            2 :         std::fs::create_dir_all(&testdir)?;
    1467            2 : 
    1468            2 :         // Create a test file.
    1469            2 :         let test_file_path = testdir.join("concurrency_test_file");
    1470            2 :         {
    1471            2 :             let file = File::create(&test_file_path)?;
    1472            2 :             file.write_all_at(&SAMPLE, 0)?;
    1473            2 :         }
    1474            2 : 
    1475            2 :         // Open the file many times.
    1476            2 :         let mut files = Vec::new();
    1477          202 :         for _ in 0..VIRTUAL_FILES {
    1478          200 :             let f = VirtualFile::open_with_options(
    1479          200 :                 &test_file_path,
    1480          200 :                 OpenOptions::new().read(true),
    1481          200 :                 &ctx,
    1482          200 :             )
    1483          101 :             .await?;
    1484          200 :             files.push(f);
    1485            2 :         }
    1486            2 :         let files = Arc::new(files);
    1487            2 : 
    1488            2 :         // Launch many threads, and use the virtual files concurrently in random order.
    1489            2 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1490            2 :             .worker_threads(THREADS)
    1491            2 :             .thread_name("test_vfile_concurrency thread")
    1492            2 :             .build()
    1493            2 :             .unwrap();
    1494            2 :         let mut hdls = Vec::new();
    1495          202 :         for _threadno in 0..THREADS {
    1496          200 :             let files = files.clone();
    1497          200 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1498          200 :             let hdl = rt.spawn(async move {
    1499          200 :                 let mut buf = vec![0u8; SIZE];
    1500          200 :                 let mut rng = rand::rngs::OsRng;
    1501       200000 :                 for _ in 1..1000 {
    1502       199800 :                     let f = &files[rng.gen_range(0..files.len())];
    1503       199800 :                     buf = f
    1504       199800 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1505       558539 :                         .await
    1506       199800 :                         .unwrap()
    1507       199800 :                         .into_inner();
    1508       199800 :                     assert!(buf == SAMPLE);
    1509            2 :                 }
    1510          200 :             });
    1511          200 :             hdls.push(hdl);
    1512          200 :         }
    1513          202 :         for hdl in hdls {
    1514          200 :             hdl.await?;
    1515            2 :         }
    1516            2 :         std::mem::forget(rt);
    1517            2 : 
    1518            2 :         Ok(())
    1519            2 :     }
    1520              : 
    1521              :     #[tokio::test]
    1522            2 :     async fn test_atomic_overwrite_basic() {
    1523            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1524            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1525            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1526            2 : 
    1527            2 :         let path = testdir.join("myfile");
    1528            2 :         let tmp_path = testdir.join("myfile.tmp");
    1529            2 : 
    1530            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1531            2 :             .await
    1532            2 :             .unwrap();
    1533            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1534            2 :         let post = file.read_string(&ctx).await.unwrap();
    1535            2 :         assert_eq!(post, "foo");
    1536            2 :         assert!(!tmp_path.exists());
    1537            2 :         drop(file);
    1538            2 : 
    1539            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1540            2 :             .await
    1541            2 :             .unwrap();
    1542            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1543            2 :         let post = file.read_string(&ctx).await.unwrap();
    1544            2 :         assert_eq!(post, "bar");
    1545            2 :         assert!(!tmp_path.exists());
    1546            2 :         drop(file);
    1547            2 :     }
    1548              : 
    1549              :     #[tokio::test]
    1550            2 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1551            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1552            2 :         let testdir =
    1553            2 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1554            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1555            2 : 
    1556            2 :         let path = testdir.join("myfile");
    1557            2 :         let tmp_path = testdir.join("myfile.tmp");
    1558            2 : 
    1559            2 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1560            2 :         assert!(tmp_path.exists());
    1561            2 : 
    1562            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1563            2 :             .await
    1564            2 :             .unwrap();
    1565            2 : 
    1566            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1567            2 :         let post = file.read_string(&ctx).await.unwrap();
    1568            2 :         assert_eq!(post, "foo");
    1569            2 :         assert!(!tmp_path.exists());
    1570            2 :         drop(file);
    1571            2 :     }
    1572              : }
        

Generated by: LCOV version 2.1-beta