LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: fcb6851dc33e7b885c65f69bc628557fd22df639.info Lines: 93.4 % 978 913
Test Date: 2024-08-20 19:05:37 Functions: 91.2 % 204 186

            Line data    Source code
       1              : //!
       2              : //! VirtualFile is like a normal File, but it's not bound directly to
       3              : //! a file descriptor. Instead, the file is opened when it's read from,
       4              : //! and if too many files are open globally in the system, least-recently
       5              : //! used ones are closed.
       6              : //!
       7              : //! To track which files have been recently used, we use the clock algorithm
       8              : //! with a 'recently_used' flag on each slot.
       9              : //!
      10              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11              : //! src/backend/storage/file/fd.c
      12              : //!
      13              : use crate::context::RequestContext;
      14              : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      15              : 
      16              : use crate::page_cache::{PageWriteGuard, PAGE_SZ};
      17              : use crate::tenant::TENANTS_SEGMENT_NAME;
      18              : use camino::{Utf8Path, Utf8PathBuf};
      19              : use once_cell::sync::OnceCell;
      20              : use owned_buffers_io::io_buf_ext::FullSlice;
      21              : use pageserver_api::shard::TenantShardId;
      22              : use std::fs::File;
      23              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      24              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      25              : 
      26              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      27              : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      28              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      29              : use tokio::time::Instant;
      30              : 
      31              : pub use pageserver_api::models::virtual_file as api;
      32              : pub(crate) mod io_engine;
      33              : pub use io_engine::feature_test as io_engine_feature_test;
      34              : pub use io_engine::io_engine_for_bench;
      35              : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
      36              : mod metadata;
      37              : mod open_options;
      38              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      39              : pub(crate) use api::DirectIoMode;
      40              : pub(crate) use io_engine::IoEngineKind;
      41              : pub(crate) use metadata::Metadata;
      42              : pub(crate) use open_options::*;
      43              : 
      44              : pub(crate) mod owned_buffers_io {
      45              :     //! Abstractions for IO with owned buffers.
      46              :     //!
      47              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      48              :     //! reason we need this abstraction.
      49              :     //!
      50              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      51              :     //! but for the time being we're proving out the primitives in the neon.git repo
      52              :     //! for faster iteration.
      53              : 
      54              :     pub(crate) mod io_buf_ext;
      55              :     pub(crate) mod slice;
      56              :     pub(crate) mod write;
      57              :     pub(crate) mod util {
      58              :         pub(crate) mod size_tracking_writer;
      59              :     }
      60              : }
      61              : 
      62              : ///
      63              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      64              : /// the underlying file is closed if the system is low on file descriptors,
      65              : /// and re-opened when it's accessed again.
      66              : ///
      67              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      68              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      69              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      70              : /// require a mutable reference, because they modify the "current position".
      71              : ///
      72              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      73              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      74              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      75              : /// 'tag' field is used to detect whether the handle still is valid or not.
      76              : ///
      77              : #[derive(Debug)]
      78              : pub struct VirtualFile {
      79              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      80              :     /// might contain our File, or it may be empty, or it may contain a File that
      81              :     /// belongs to a different VirtualFile.
      82              :     handle: RwLock<SlotHandle>,
      83              : 
      84              :     /// Current file position
      85              :     pos: u64,
      86              : 
      87              :     /// File path and options to use to open it.
      88              :     ///
      89              :     /// Note: this only contains the options needed to re-open it. For example,
      90              :     /// if a new file is created, we only pass the create flag when it's initially
      91              :     /// opened, in the VirtualFile::create() function, and strip the flag before
      92              :     /// storing it here.
      93              :     pub path: Utf8PathBuf,
      94              :     open_options: OpenOptions,
      95              : 
      96              :     // These are strings becase we only use them for metrics, and those expect strings.
      97              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      98              :     // strings.
      99              :     tenant_id: String,
     100              :     shard_id: String,
     101              :     timeline_id: String,
     102              : }
     103              : 
     104              : #[derive(Debug, PartialEq, Clone, Copy)]
     105              : struct SlotHandle {
     106              :     /// Index into OPEN_FILES.slots
     107              :     index: usize,
     108              : 
     109              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     110              :     /// been recycled and no longer contains the FD for this virtual file.
     111              :     tag: u64,
     112              : }
     113              : 
     114              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     115              : /// are currently open. Each slot in the array is protected by a separate lock,
     116              : /// so that different files can be accessed independently. The lock must be held
     117              : /// in write mode to replace the slot with a different file, but a read mode
     118              : /// is enough to operate on the file, whether you're reading or writing to it.
     119              : ///
     120              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     121              : /// the virtual_file::init() function. It must be called exactly once at page
     122              : /// server startup.
     123              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     124              : 
     125              : struct OpenFiles {
     126              :     slots: &'static [Slot],
     127              : 
     128              :     /// clock arm for the clock algorithm
     129              :     next: AtomicUsize,
     130              : }
     131              : 
     132              : struct Slot {
     133              :     inner: RwLock<SlotInner>,
     134              : 
     135              :     /// has this file been used since last clock sweep?
     136              :     recently_used: AtomicBool,
     137              : }
     138              : 
     139              : struct SlotInner {
     140              :     /// Counter that's incremented every time a different file is stored here.
     141              :     /// To avoid the ABA problem.
     142              :     tag: u64,
     143              : 
     144              :     /// the underlying file
     145              :     file: Option<OwnedFd>,
     146              : }
     147              : 
     148              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     149              : struct PageWriteGuardBuf {
     150              :     page: PageWriteGuard<'static>,
     151              : }
     152              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     153              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     154              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     155              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     156              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     157       247085 :     fn stable_ptr(&self) -> *const u8 {
     158       247085 :         self.page.as_ptr()
     159       247085 :     }
     160       463284 :     fn bytes_init(&self) -> usize {
     161       463284 :         self.page.len()
     162       463284 :     }
     163       185313 :     fn bytes_total(&self) -> usize {
     164       185313 :         self.page.len()
     165       185313 :     }
     166              : }
     167              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     168              : // hence it's safe to hand out the `stable_mut_ptr()`.
     169              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     170        92657 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     171        92657 :         self.page.as_mut_ptr()
     172        92657 :     }
     173              : 
     174        61771 :     unsafe fn set_init(&mut self, pos: usize) {
     175        61771 :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     176        61771 :         assert!(pos <= self.page.len());
     177        61771 :     }
     178              : }
     179              : 
     180              : impl OpenFiles {
     181              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     182              :     ///
     183              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     184              :     /// recently_used has been set. It's all ready for reuse.
     185       190000 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     186       190000 :         //
     187       190000 :         // Run the clock algorithm to find a slot to replace.
     188       190000 :         //
     189       190000 :         let num_slots = self.slots.len();
     190       190000 :         let mut retries = 0;
     191              :         let mut slot;
     192              :         let mut slot_guard;
     193              :         let index;
     194      2034449 :         loop {
     195      2034449 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     196      2034449 :             slot = &self.slots[next];
     197      2034449 : 
     198      2034449 :             // If the recently_used flag on this slot is set, continue the clock
     199      2034449 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     200      2034449 :             // lock, also continue the clock sweep.
     201      2034449 :             //
     202      2034449 :             // We only continue in this manner for a while, though. If we loop
     203      2034449 :             // through the array twice without finding a victim, just pick the
     204      2034449 :             // next slot and wait until we can reuse it. This way, we avoid
     205      2034449 :             // spinning in the extreme case that all the slots are busy with an
     206      2034449 :             // I/O operation.
     207      2034449 :             if retries < num_slots * 2 {
     208      1962879 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     209      1757297 :                     if let Ok(guard) = slot.inner.try_write() {
     210       118430 :                         slot_guard = guard;
     211       118430 :                         index = next;
     212       118430 :                         break;
     213      1638867 :                     }
     214       205582 :                 }
     215      1844449 :                 retries += 1;
     216              :             } else {
     217        71570 :                 slot_guard = slot.inner.write().await;
     218        71570 :                 index = next;
     219        71570 :                 break;
     220              :             }
     221              :         }
     222              : 
     223              :         //
     224              :         // We now have the victim slot locked. If it was in use previously, close the
     225              :         // old file.
     226              :         //
     227       190000 :         if let Some(old_file) = slot_guard.file.take() {
     228       185475 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     229       185475 :             // distinguish the two.
     230       185475 :             STORAGE_IO_TIME_METRIC
     231       185475 :                 .get(StorageIoOperation::CloseByReplace)
     232       185475 :                 .observe_closure_duration(|| drop(old_file));
     233       185475 :         }
     234              : 
     235              :         // Prepare the slot for reuse and return it
     236       190000 :         slot_guard.tag += 1;
     237       190000 :         slot.recently_used.store(true, Ordering::Relaxed);
     238       190000 :         (
     239       190000 :             SlotHandle {
     240       190000 :                 index,
     241       190000 :                 tag: slot_guard.tag,
     242       190000 :             },
     243       190000 :             slot_guard,
     244       190000 :         )
     245       190000 :     }
     246              : }
     247              : 
     248              : /// Identify error types that should alwways terminate the process.  Other
     249              : /// error types may be elegible for retry.
     250            0 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     251            0 :     use nix::errno::Errno::*;
     252            0 :     match e.raw_os_error().map(nix::errno::from_i32) {
     253              :         Some(EIO) => {
     254              :             // Terminate on EIO because we no longer trust the device to store
     255              :             // data safely, or to uphold persistence guarantees on fsync.
     256            0 :             true
     257              :         }
     258              :         Some(EROFS) => {
     259              :             // Terminate on EROFS because a filesystem is usually remounted
     260              :             // readonly when it has experienced some critical issue, so the same
     261              :             // logic as EIO applies.
     262            0 :             true
     263              :         }
     264              :         Some(EACCES) => {
     265              :             // Terminate on EACCESS because we should always have permissions
     266              :             // for our own data dir: if we don't, then we can't do our job and
     267              :             // need administrative intervention to fix permissions.  Terminating
     268              :             // is the best way to make sure we stop cleanly rather than going
     269              :             // into infinite retry loops, and will make it clear to the outside
     270              :             // world that we need help.
     271            0 :             true
     272              :         }
     273              :         _ => {
     274              :             // Treat all other local file I/O errors are retryable.  This includes:
     275              :             // - ENOSPC: we stay up and wait for eviction to free some space
     276              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     277              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     278            0 :             false
     279              :         }
     280              :     }
     281            0 : }
     282              : 
     283              : /// Call this when the local filesystem gives us an error with an external
     284              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     285              : /// bad storage or bad configuration, and we can't fix that from inside
     286              : /// a running process.
     287            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     288            0 :     tracing::error!("Fatal I/O error: {e}: {context})");
     289            0 :     std::process::abort();
     290              : }
     291              : 
     292              : pub(crate) trait MaybeFatalIo<T> {
     293              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     294              :     fn fatal_err(self, context: &str) -> T;
     295              : }
     296              : 
     297              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     298              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     299              :     ///
     300              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     301              :     /// not on ENOSPC.
     302           22 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     303           22 :         if let Err(e) = &self {
     304            0 :             if is_fatal_io_error(e) {
     305            0 :                 on_fatal_io_error(e, context);
     306            0 :             }
     307           22 :         }
     308           22 :         self
     309           22 :     }
     310              : 
     311              :     /// Terminate the process on any I/O error.
     312              :     ///
     313              :     /// This is appropriate for reads on files that we know exist: they should always work.
     314         2038 :     fn fatal_err(self, context: &str) -> T {
     315         2038 :         match self {
     316         2038 :             Ok(v) => v,
     317            0 :             Err(e) => {
     318            0 :                 on_fatal_io_error(&e, context);
     319              :             }
     320              :         }
     321         2038 :     }
     322              : }
     323              : 
     324              : /// Observe duration for the given storage I/O operation
     325              : ///
     326              : /// Unlike `observe_closure_duration`, this supports async,
     327              : /// where "support" means that we measure wall clock time.
     328              : macro_rules! observe_duration {
     329              :     ($op:expr, $($body:tt)*) => {{
     330              :         let instant = Instant::now();
     331              :         let result = $($body)*;
     332              :         let elapsed = instant.elapsed().as_secs_f64();
     333              :         STORAGE_IO_TIME_METRIC
     334              :             .get($op)
     335              :             .observe(elapsed);
     336              :         result
     337              :     }}
     338              : }
     339              : 
     340              : macro_rules! with_file {
     341              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     342              :         let $ident = $this.lock_file().await?;
     343              :         observe_duration!($op, $($body)*)
     344              :     }};
     345              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     346              :         let mut $ident = $this.lock_file().await?;
     347              :         observe_duration!($op, $($body)*)
     348              :     }};
     349              : }
     350              : 
     351              : impl VirtualFile {
     352              :     /// Open a file in read-only mode. Like File::open.
     353         2196 :     pub async fn open<P: AsRef<Utf8Path>>(
     354         2196 :         path: P,
     355         2196 :         ctx: &RequestContext,
     356         2196 :     ) -> Result<VirtualFile, std::io::Error> {
     357         2196 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
     358         2196 :     }
     359              : 
     360              :     /// Create a new file for writing. If the file exists, it will be truncated.
     361              :     /// Like File::create.
     362         1449 :     pub async fn create<P: AsRef<Utf8Path>>(
     363         1449 :         path: P,
     364         1449 :         ctx: &RequestContext,
     365         1449 :     ) -> Result<VirtualFile, std::io::Error> {
     366         1449 :         Self::open_with_options(
     367         1449 :             path.as_ref(),
     368         1449 :             OpenOptions::new().write(true).create(true).truncate(true),
     369         1449 :             ctx,
     370         1449 :         )
     371          747 :         .await
     372         1449 :     }
     373              : 
     374              :     /// Open a file with given options.
     375              :     ///
     376              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     377              :     /// they will be applied also when the file is subsequently re-opened, not only
     378              :     /// on the first time. Make sure that's sane!
     379         5617 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     380         5617 :         path: P,
     381         5617 :         open_options: &OpenOptions,
     382         5617 :         _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
     383         5617 :     ) -> Result<VirtualFile, std::io::Error> {
     384         5617 :         let path_ref = path.as_ref();
     385         5617 :         let path_str = path_ref.to_string();
     386         5617 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     387         5617 :         let (tenant_id, shard_id, timeline_id) =
     388         5617 :             if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     389         4123 :                 let tenant_shard_part = parts[parts.len() - 4];
     390         4123 :                 let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
     391         4123 :                     Ok(tenant_shard_id) => (
     392         4123 :                         tenant_shard_id.tenant_id.to_string(),
     393         4123 :                         format!("{}", tenant_shard_id.shard_slug()),
     394         4123 :                     ),
     395              :                     Err(_) => {
     396              :                         // Malformed path: this ID is just for observability, so tolerate it
     397              :                         // and pass through
     398            0 :                         (tenant_shard_part.to_string(), "*".to_string())
     399              :                     }
     400              :                 };
     401         4123 :                 (tenant_id, shard_id, parts[parts.len() - 2].to_string())
     402              :             } else {
     403         1494 :                 ("*".to_string(), "*".to_string(), "*".to_string())
     404              :             };
     405         5617 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     406              : 
     407              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     408              :         // where our caller doesn't get to use the returned VirtualFile before its
     409              :         // slot gets re-used by someone else.
     410         5617 :         let file = observe_duration!(StorageIoOperation::Open, {
     411         5617 :             open_options.open(path_ref.as_std_path()).await?
     412              :         });
     413              : 
     414              :         // Strip all options other than read and write.
     415              :         //
     416              :         // It would perhaps be nicer to check just for the read and write flags
     417              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     418              :         // only to set them.
     419         5617 :         let mut reopen_options = open_options.clone();
     420         5617 :         reopen_options.create(false);
     421         5617 :         reopen_options.create_new(false);
     422         5617 :         reopen_options.truncate(false);
     423         5617 : 
     424         5617 :         let vfile = VirtualFile {
     425         5617 :             handle: RwLock::new(handle),
     426         5617 :             pos: 0,
     427         5617 :             path: path_ref.to_path_buf(),
     428         5617 :             open_options: reopen_options,
     429         5617 :             tenant_id,
     430         5617 :             shard_id,
     431         5617 :             timeline_id,
     432         5617 :         };
     433         5617 : 
     434         5617 :         // TODO: Under pressure, it's likely the slot will get re-used and
     435         5617 :         // the underlying file closed before they get around to using it.
     436         5617 :         // => https://github.com/neondatabase/neon/issues/6065
     437         5617 :         slot_guard.file.replace(file);
     438         5617 : 
     439         5617 :         Ok(vfile)
     440         5617 :     }
     441              : 
     442              :     /// Async version of [`::utils::crashsafe::overwrite`].
     443              :     ///
     444              :     /// # NB:
     445              :     ///
     446              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     447              :     /// it did at an earlier time.
     448              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     449           28 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     450           28 :         final_path: Utf8PathBuf,
     451           28 :         tmp_path: Utf8PathBuf,
     452           28 :         content: B,
     453           28 :     ) -> std::io::Result<()> {
     454           28 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     455           28 :         // See https://github.com/neondatabase/neon/issues/6663
     456           28 : 
     457           28 :         tokio::task::spawn_blocking(move || {
     458           28 :             let slice_storage;
     459           28 :             let content_len = content.bytes_init();
     460           28 :             let content = if content.bytes_init() > 0 {
     461           28 :                 slice_storage = Some(content.slice(0..content_len));
     462           28 :                 slice_storage.as_deref().expect("just set it to Some()")
     463              :             } else {
     464            0 :                 &[]
     465              :             };
     466           28 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     467           28 :         })
     468           28 :         .await
     469           28 :         .expect("blocking task is never aborted")
     470           28 :     }
     471              : 
     472              :     /// Call File::sync_all() on the underlying File.
     473         2687 :     pub async fn sync_all(&self) -> Result<(), Error> {
     474         2687 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     475         2687 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     476         2687 :             res
     477              :         })
     478         2687 :     }
     479              : 
     480              :     /// Call File::sync_data() on the underlying File.
     481            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     482            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     483            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     484            0 :             res
     485              :         })
     486            0 :     }
     487              : 
     488         1684 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     489         1684 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     490         1684 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     491         1684 :             res
     492              :         })
     493         1684 :     }
     494              : 
     495              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     496              :     /// opens it and evicts some other File if necessary. The passed parameter is
     497              :     /// assumed to be a function available for the physical `File`.
     498              :     ///
     499              :     /// We are doing it via a macro as Rust doesn't support async closures that
     500              :     /// take on parameters with lifetimes.
     501      1627674 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     502      1627674 :         let open_files = get_open_files();
     503              : 
     504       184383 :         let mut handle_guard = {
     505              :             // Read the cached slot handle, and see if the slot that it points to still
     506              :             // contains our File.
     507              :             //
     508              :             // We only need to hold the handle lock while we read the current handle. If
     509              :             // another thread closes the file and recycles the slot for a different file,
     510              :             // we will notice that the handle we read is no longer valid and retry.
     511      1627674 :             let mut handle = *self.handle.read().await;
     512      1718995 :             loop {
     513      1718995 :                 // Check if the slot contains our File
     514      1718995 :                 {
     515      1718995 :                     let slot = &open_files.slots[handle.index];
     516      1718995 :                     let slot_guard = slot.inner.read().await;
     517      1718995 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     518              :                         // Found a cached file descriptor.
     519      1443291 :                         slot.recently_used.store(true, Ordering::Relaxed);
     520      1443291 :                         return Ok(FileGuard { slot_guard });
     521       275704 :                     }
     522              :                 }
     523              : 
     524              :                 // The slot didn't contain our File. We will have to open it ourselves,
     525              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     526              :                 // that no other thread will try to concurrently open the same file.
     527       275704 :                 let handle_guard = self.handle.write().await;
     528              : 
     529              :                 // If another thread changed the handle while we were not holding the lock,
     530              :                 // then the handle might now be valid again. Loop back to retry.
     531       275704 :                 if *handle_guard != handle {
     532        91321 :                     handle = *handle_guard;
     533        91321 :                     continue;
     534       184383 :                 }
     535       184383 :                 break handle_guard;
     536              :             }
     537              :         };
     538              : 
     539              :         // We need to open the file ourselves. The handle in the VirtualFile is
     540              :         // now locked in write-mode. Find a free slot to put it in.
     541       184383 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     542              : 
     543              :         // Re-open the physical file.
     544              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     545              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     546              :         // of the virtual file descriptor cache.
     547       184383 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     548       184383 :             self.open_options.open(self.path.as_std_path()).await?
     549              :         });
     550              : 
     551              :         // Store the File in the slot and update the handle in the VirtualFile
     552              :         // to point to it.
     553       184383 :         slot_guard.file.replace(file);
     554       184383 : 
     555       184383 :         *handle_guard = handle;
     556       184383 : 
     557       184383 :         return Ok(FileGuard {
     558       184383 :             slot_guard: slot_guard.downgrade(),
     559       184383 :         });
     560      1627674 :     }
     561              : 
     562           22 :     pub fn remove(self) {
     563           22 :         let path = self.path.clone();
     564           22 :         drop(self);
     565           22 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     566           22 :     }
     567              : 
     568         5086 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     569         5086 :         match pos {
     570         5076 :             SeekFrom::Start(offset) => {
     571         5076 :                 self.pos = offset;
     572         5076 :             }
     573            4 :             SeekFrom::End(offset) => {
     574            4 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     575            4 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     576              :             }
     577            6 :             SeekFrom::Current(offset) => {
     578            6 :                 let pos = self.pos as i128 + offset as i128;
     579            6 :                 if pos < 0 {
     580            2 :                     return Err(Error::new(
     581            2 :                         ErrorKind::InvalidInput,
     582            2 :                         "offset would be negative",
     583            2 :                     ));
     584            4 :                 }
     585            4 :                 if pos > u64::MAX as i128 {
     586            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     587            4 :                 }
     588            4 :                 self.pos = pos as u64;
     589              :             }
     590              :         }
     591         5082 :         Ok(self.pos)
     592         5086 :     }
     593              : 
     594              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     595              :     ///
     596              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     597       496291 :     pub async fn read_exact_at<Buf>(
     598       496291 :         &self,
     599       496291 :         slice: Slice<Buf>,
     600       496291 :         offset: u64,
     601       496291 :         ctx: &RequestContext,
     602       496291 :     ) -> Result<Slice<Buf>, Error>
     603       496291 :     where
     604       496291 :         Buf: IoBufMut + Send,
     605       496291 :     {
     606       496291 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     607       496291 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     608              :         } else {
     609            0 :             None
     610              :         };
     611              : 
     612       496291 :         let original_bounds = slice.bounds();
     613       496291 :         let (buf, res) =
     614       701942 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     615       496291 :         let res = res.map(|_| buf.slice(original_bounds));
     616              : 
     617       496291 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     618       496291 :             if let Ok(slice) = &res {
     619       496291 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     620       496291 :                 assert_eq!(original_bounds, returned_bounds);
     621            0 :             }
     622            0 :         }
     623              : 
     624       496291 :         res
     625       496291 :     }
     626              : 
     627              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     628        61771 :     pub async fn read_exact_at_page(
     629        61771 :         &self,
     630        61771 :         page: PageWriteGuard<'static>,
     631        61771 :         offset: u64,
     632        61771 :         ctx: &RequestContext,
     633        61771 :     ) -> Result<PageWriteGuard<'static>, Error> {
     634        61771 :         let buf = PageWriteGuardBuf { page }.slice_full();
     635        61771 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     636        61771 :         self.read_exact_at(buf, offset, ctx)
     637        37324 :             .await
     638        61771 :             .map(|slice| slice.into_inner().page)
     639        61771 :     }
     640              : 
     641              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     642            4 :     pub async fn write_all_at<Buf: IoBuf + Send>(
     643            4 :         &self,
     644            4 :         buf: FullSlice<Buf>,
     645            4 :         mut offset: u64,
     646            4 :         ctx: &RequestContext,
     647            4 :     ) -> (FullSlice<Buf>, Result<(), Error>) {
     648            4 :         let buf = buf.into_raw_slice();
     649            4 :         let bounds = buf.bounds();
     650            4 :         let restore =
     651            4 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     652            4 :         let mut buf = buf;
     653            8 :         while !buf.is_empty() {
     654            4 :             let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
     655            4 :             buf = tmp.into_raw_slice();
     656            0 :             match res {
     657              :                 Ok(0) => {
     658            0 :                     return (
     659            0 :                         restore(buf),
     660            0 :                         Err(Error::new(
     661            0 :                             std::io::ErrorKind::WriteZero,
     662            0 :                             "failed to write whole buffer",
     663            0 :                         )),
     664            0 :                     );
     665              :                 }
     666            4 :                 Ok(n) => {
     667            4 :                     buf = buf.slice(n..);
     668            4 :                     offset += n as u64;
     669            4 :                 }
     670            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     671            0 :                 Err(e) => return (restore(buf), Err(e)),
     672              :             }
     673              :         }
     674            4 :         (restore(buf), Ok(()))
     675            4 :     }
     676              : 
     677              :     /// Writes `buf` to the file at the current offset.
     678              :     ///
     679              :     /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
     680      1126812 :     pub async fn write_all<Buf: IoBuf + Send>(
     681      1126812 :         &mut self,
     682      1126812 :         buf: FullSlice<Buf>,
     683      1126812 :         ctx: &RequestContext,
     684      1126812 :     ) -> (FullSlice<Buf>, Result<usize, Error>) {
     685      1126812 :         let buf = buf.into_raw_slice();
     686      1126812 :         let bounds = buf.bounds();
     687      1126812 :         let restore =
     688      1126812 :             |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
     689      1126812 :         let nbytes = buf.len();
     690      1126812 :         let mut buf = buf;
     691      2253584 :         while !buf.is_empty() {
     692      1126774 :             let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
     693      1126774 :             buf = tmp.into_raw_slice();
     694            2 :             match res {
     695              :                 Ok(0) => {
     696            0 :                     return (
     697            0 :                         restore(buf),
     698            0 :                         Err(Error::new(
     699            0 :                             std::io::ErrorKind::WriteZero,
     700            0 :                             "failed to write whole buffer",
     701            0 :                         )),
     702            0 :                     );
     703              :                 }
     704      1126772 :                 Ok(n) => {
     705      1126772 :                     buf = buf.slice(n..);
     706      1126772 :                 }
     707            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     708            2 :                 Err(e) => return (restore(buf), Err(e)),
     709              :             }
     710              :         }
     711      1126810 :         (restore(buf), Ok(nbytes))
     712      1126812 :     }
     713              : 
     714      1126774 :     async fn write<B: IoBuf + Send>(
     715      1126774 :         &mut self,
     716      1126774 :         buf: FullSlice<B>,
     717      1126774 :         ctx: &RequestContext,
     718      1126774 :     ) -> (FullSlice<B>, Result<usize, std::io::Error>) {
     719      1126774 :         let pos = self.pos;
     720      1126774 :         let (buf, res) = self.write_at(buf, pos, ctx).await;
     721      1126774 :         let n = match res {
     722      1126772 :             Ok(n) => n,
     723            2 :             Err(e) => return (buf, Err(e)),
     724              :         };
     725      1126772 :         self.pos += n as u64;
     726      1126772 :         (buf, Ok(n))
     727      1126774 :     }
     728              : 
     729       496521 :     pub(crate) async fn read_at<Buf>(
     730       496521 :         &self,
     731       496521 :         buf: tokio_epoll_uring::Slice<Buf>,
     732       496521 :         offset: u64,
     733       496521 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     734       496521 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     735       496521 :     where
     736       496521 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     737       496521 :     {
     738       496521 :         let file_guard = match self.lock_file().await {
     739       496521 :             Ok(file_guard) => file_guard,
     740            0 :             Err(e) => return (buf, Err(e)),
     741              :         };
     742              : 
     743       496521 :         observe_duration!(StorageIoOperation::Read, {
     744       496521 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     745       496521 :             if let Ok(size) = res {
     746       496519 :                 STORAGE_IO_SIZE
     747       496519 :                     .with_label_values(&[
     748       496519 :                         "read",
     749       496519 :                         &self.tenant_id,
     750       496519 :                         &self.shard_id,
     751       496519 :                         &self.timeline_id,
     752       496519 :                     ])
     753       496519 :                     .add(size as i64);
     754       496519 :             }
     755       496521 :             (buf, res)
     756              :         })
     757       496521 :     }
     758              : 
     759      1126778 :     async fn write_at<B: IoBuf + Send>(
     760      1126778 :         &self,
     761      1126778 :         buf: FullSlice<B>,
     762      1126778 :         offset: u64,
     763      1126778 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     764      1126778 :     ) -> (FullSlice<B>, Result<usize, Error>) {
     765      1126778 :         let file_guard = match self.lock_file().await {
     766      1126778 :             Ok(file_guard) => file_guard,
     767            0 :             Err(e) => return (buf, Err(e)),
     768              :         };
     769      1126778 :         observe_duration!(StorageIoOperation::Write, {
     770      1126778 :             let ((_file_guard, buf), result) =
     771      1126778 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     772      1126778 :             if let Ok(size) = result {
     773      1126776 :                 STORAGE_IO_SIZE
     774      1126776 :                     .with_label_values(&[
     775      1126776 :                         "write",
     776      1126776 :                         &self.tenant_id,
     777      1126776 :                         &self.shard_id,
     778      1126776 :                         &self.timeline_id,
     779      1126776 :                     ])
     780      1126776 :                     .add(size as i64);
     781      1126776 :             }
     782      1126778 :             (buf, result)
     783              :         })
     784      1126778 :     }
     785              : }
     786              : 
     787              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     788       496299 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     789       496299 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     790       496299 :     mut offset: u64,
     791       496299 :     mut read_at: F,
     792       496299 : ) -> (Buf, std::io::Result<()>)
     793       496299 : where
     794       496299 :     Buf: IoBufMut + Send,
     795       496299 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
     796       496299 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
     797       496299 : {
     798       992386 :     while buf.bytes_total() != 0 {
     799              :         let res;
     800       701942 :         (buf, res) = read_at(buf, offset).await;
     801            0 :         match res {
     802            2 :             Ok(0) => break,
     803       496087 :             Ok(n) => {
     804       496087 :                 buf = buf.slice(n..);
     805       496087 :                 offset += n as u64;
     806       496087 :             }
     807            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     808            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     809              :         }
     810              :     }
     811              :     // NB: don't use `buf.is_empty()` here; it is from the
     812              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     813              :     // returned by it only covers the initialized portion of `buf`.
     814              :     // Whereas we're interested in ensuring that we filled the entire
     815              :     // buffer that the user passed in.
     816       496299 :     if buf.bytes_total() != 0 {
     817            2 :         (
     818            2 :             buf.into_inner(),
     819            2 :             Err(std::io::Error::new(
     820            2 :                 std::io::ErrorKind::UnexpectedEof,
     821            2 :                 "failed to fill whole buffer",
     822            2 :             )),
     823            2 :         )
     824              :     } else {
     825       496297 :         assert_eq!(buf.len(), buf.bytes_total());
     826       496297 :         (buf.into_inner(), Ok(()))
     827              :     }
     828       496299 : }
     829              : 
     830              : #[cfg(test)]
     831              : mod test_read_exact_at_impl {
     832              : 
     833              :     use std::{collections::VecDeque, sync::Arc};
     834              : 
     835              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     836              : 
     837              :     use super::read_exact_at_impl;
     838              : 
     839              :     struct Expectation {
     840              :         offset: u64,
     841              :         bytes_total: usize,
     842              :         result: std::io::Result<Vec<u8>>,
     843              :     }
     844              :     struct MockReadAt {
     845              :         expectations: VecDeque<Expectation>,
     846              :     }
     847              : 
     848              :     impl MockReadAt {
     849           12 :         async fn read_at(
     850           12 :             &mut self,
     851           12 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     852           12 :             offset: u64,
     853           12 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     854           12 :             let exp = self
     855           12 :                 .expectations
     856           12 :                 .pop_front()
     857           12 :                 .expect("read_at called but we have no expectations left");
     858           12 :             assert_eq!(exp.offset, offset);
     859           12 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     860           12 :             match exp.result {
     861           12 :                 Ok(bytes) => {
     862           12 :                     assert!(bytes.len() <= buf.bytes_total());
     863           12 :                     buf.put_slice(&bytes);
     864           12 :                     (buf, Ok(bytes.len()))
     865              :                 }
     866            0 :                 Err(e) => (buf, Err(e)),
     867              :             }
     868           12 :         }
     869              :     }
     870              : 
     871              :     impl Drop for MockReadAt {
     872            8 :         fn drop(&mut self) {
     873            8 :             assert_eq!(self.expectations.len(), 0);
     874            8 :         }
     875              :     }
     876              : 
     877              :     #[tokio::test]
     878            2 :     async fn test_basic() {
     879            2 :         let buf = Vec::with_capacity(5).slice_full();
     880            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     881            2 :             expectations: VecDeque::from(vec![Expectation {
     882            2 :                 offset: 0,
     883            2 :                 bytes_total: 5,
     884            2 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     885            2 :             }]),
     886            2 :         }));
     887            2 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     888            2 :             let mock_read_at = Arc::clone(&mock_read_at);
     889            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     890            2 :         })
     891            2 :         .await;
     892            2 :         assert!(res.is_ok());
     893            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
     894            2 :     }
     895              : 
     896              :     #[tokio::test]
     897            2 :     async fn test_empty_buf_issues_no_syscall() {
     898            2 :         let buf = Vec::new().slice_full();
     899            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     900            2 :             expectations: VecDeque::new(),
     901            2 :         }));
     902            2 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     903            0 :             let mock_read_at = Arc::clone(&mock_read_at);
     904            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     905            2 :         })
     906            2 :         .await;
     907            2 :         assert!(res.is_ok());
     908            2 :     }
     909              : 
     910              :     #[tokio::test]
     911            2 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
     912            2 :         let buf = Vec::with_capacity(4).slice_full();
     913            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     914            2 :             expectations: VecDeque::from(vec![
     915            2 :                 Expectation {
     916            2 :                     offset: 0,
     917            2 :                     bytes_total: 4,
     918            2 :                     result: Ok(vec![b'a', b'b']),
     919            2 :                 },
     920            2 :                 Expectation {
     921            2 :                     offset: 2,
     922            2 :                     bytes_total: 2,
     923            2 :                     result: Ok(vec![b'c', b'd']),
     924            2 :                 },
     925            2 :             ]),
     926            2 :         }));
     927            4 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     928            4 :             let mock_read_at = Arc::clone(&mock_read_at);
     929            4 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     930            4 :         })
     931            2 :         .await;
     932            2 :         assert!(res.is_ok());
     933            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
     934            2 :     }
     935              : 
     936              :     #[tokio::test]
     937            2 :     async fn test_eof_before_buffer_full() {
     938            2 :         let buf = Vec::with_capacity(3).slice_full();
     939            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     940            2 :             expectations: VecDeque::from(vec![
     941            2 :                 Expectation {
     942            2 :                     offset: 0,
     943            2 :                     bytes_total: 3,
     944            2 :                     result: Ok(vec![b'a']),
     945            2 :                 },
     946            2 :                 Expectation {
     947            2 :                     offset: 1,
     948            2 :                     bytes_total: 2,
     949            2 :                     result: Ok(vec![b'b']),
     950            2 :                 },
     951            2 :                 Expectation {
     952            2 :                     offset: 2,
     953            2 :                     bytes_total: 1,
     954            2 :                     result: Ok(vec![]),
     955            2 :                 },
     956            2 :             ]),
     957            2 :         }));
     958            6 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     959            6 :             let mock_read_at = Arc::clone(&mock_read_at);
     960            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     961            6 :         })
     962            2 :         .await;
     963            2 :         let Err(err) = res else {
     964            2 :             panic!("should return an error");
     965            2 :         };
     966            2 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
     967            2 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
     968            2 :         // buffer contents on error are unspecified
     969            2 :     }
     970              : }
     971              : 
     972              : struct FileGuard {
     973              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
     974              : }
     975              : 
     976              : impl AsRef<OwnedFd> for FileGuard {
     977      1627674 :     fn as_ref(&self) -> &OwnedFd {
     978      1627674 :         // This unwrap is safe because we only create `FileGuard`s
     979      1627674 :         // if we know that the file is Some.
     980      1627674 :         self.slot_guard.file.as_ref().unwrap()
     981      1627674 :     }
     982              : }
     983              : 
     984              : impl FileGuard {
     985              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     986       813882 :     fn with_std_file<F, R>(&self, with: F) -> R
     987       813882 :     where
     988       813882 :         F: FnOnce(&File) -> R,
     989       813882 :     {
     990       813882 :         // SAFETY:
     991       813882 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
     992       813882 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
     993       813882 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
     994       813882 :         let res = with(&file);
     995       813882 :         let _ = file.into_raw_fd();
     996       813882 :         res
     997       813882 :     }
     998              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     999            4 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
    1000            4 :     where
    1001            4 :         F: FnOnce(&mut File) -> R,
    1002            4 :     {
    1003            4 :         // SAFETY:
    1004            4 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1005            4 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
    1006            4 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1007            4 :         let res = with(&mut file);
    1008            4 :         let _ = file.into_raw_fd();
    1009            4 :         res
    1010            4 :     }
    1011              : }
    1012              : 
    1013              : impl tokio_epoll_uring::IoFd for FileGuard {
    1014       813788 :     unsafe fn as_fd(&self) -> RawFd {
    1015       813788 :         let owned_fd: &OwnedFd = self.as_ref();
    1016       813788 :         owned_fd.as_raw_fd()
    1017       813788 :     }
    1018              : }
    1019              : 
    1020              : #[cfg(test)]
    1021              : impl VirtualFile {
    1022        20916 :     pub(crate) async fn read_blk(
    1023        20916 :         &self,
    1024        20916 :         blknum: u32,
    1025        20916 :         ctx: &RequestContext,
    1026        20916 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1027        20916 :         use crate::page_cache::PAGE_SZ;
    1028        20916 :         let slice = Vec::with_capacity(PAGE_SZ).slice_full();
    1029        20916 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1030        20916 :         let slice = self
    1031        20916 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1032        10619 :             .await?;
    1033        20916 :         Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
    1034        20916 :     }
    1035              : 
    1036          224 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
    1037          224 :         let mut tmp = vec![0; 128];
    1038              :         loop {
    1039          444 :             let slice = tmp.slice(..128);
    1040          444 :             let (slice, res) = self.read_at(slice, self.pos, ctx).await;
    1041            2 :             match res {
    1042          222 :                 Ok(0) => return Ok(()),
    1043          220 :                 Ok(n) => {
    1044          220 :                     self.pos += n as u64;
    1045          220 :                     buf.extend_from_slice(&slice[..n]);
    1046          220 :                 }
    1047            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1048            2 :                 Err(e) => return Err(e),
    1049              :             }
    1050          220 :             tmp = slice.into_inner();
    1051              :         }
    1052          224 :     }
    1053              : }
    1054              : 
    1055              : impl Drop for VirtualFile {
    1056              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1057         4825 :     fn drop(&mut self) {
    1058         4825 :         let handle = self.handle.get_mut();
    1059         4825 : 
    1060         4825 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1061         4825 :             if slot_guard.tag == tag {
    1062         4825 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1063         4825 :                 // there is also operation "close-by-replace" for closes done on eviction for
    1064         4825 :                 // comparison.
    1065         4825 :                 if let Some(fd) = slot_guard.file.take() {
    1066         4273 :                     STORAGE_IO_TIME_METRIC
    1067         4273 :                         .get(StorageIoOperation::Close)
    1068         4273 :                         .observe_closure_duration(|| drop(fd));
    1069         4273 :                 }
    1070         4825 :             }
    1071         4825 :         }
    1072         4825 : 
    1073         4825 :         // We don't have async drop so we cannot directly await the lock here.
    1074         4825 :         // Instead, first do a best-effort attempt at closing the underlying
    1075         4825 :         // file descriptor by using `try_write`, and if that fails, spawn
    1076         4825 :         // a tokio task to do it asynchronously: we just want it to be
    1077         4825 :         // cleaned up eventually.
    1078         4825 :         // Most of the time, the `try_lock` should succeed though,
    1079         4825 :         // as we have `&mut self` access. In other words, if the slot
    1080         4825 :         // is still occupied by our file, there should be no access from
    1081         4825 :         // other I/O operations; the only other possible place to lock
    1082         4825 :         // the slot is the lock algorithm looking for free slots.
    1083         4825 :         let slot = &get_open_files().slots[handle.index];
    1084         4825 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1085         4825 :             clean_slot(slot, slot_guard, handle.tag);
    1086         4825 :         } else {
    1087            0 :             let tag = handle.tag;
    1088            0 :             tokio::spawn(async move {
    1089            0 :                 let slot_guard = slot.inner.write().await;
    1090            0 :                 clean_slot(slot, slot_guard, tag);
    1091            0 :             });
    1092            0 :         };
    1093         4825 :     }
    1094              : }
    1095              : 
    1096              : impl OwnedAsyncWriter for VirtualFile {
    1097              :     #[inline(always)]
    1098         6613 :     async fn write_all<Buf: IoBuf + Send>(
    1099         6613 :         &mut self,
    1100         6613 :         buf: FullSlice<Buf>,
    1101         6613 :         ctx: &RequestContext,
    1102         6613 :     ) -> std::io::Result<(usize, FullSlice<Buf>)> {
    1103         6613 :         let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
    1104         6613 :         res.map(move |v| (v, buf))
    1105         6613 :     }
    1106              : }
    1107              : 
    1108              : impl OpenFiles {
    1109          188 :     fn new(num_slots: usize) -> OpenFiles {
    1110          188 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1111         1880 :         for _ in 0..num_slots {
    1112         1880 :             let slot = Slot {
    1113         1880 :                 recently_used: AtomicBool::new(false),
    1114         1880 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1115         1880 :             };
    1116         1880 :             slots.push(slot);
    1117         1880 :         }
    1118              : 
    1119          188 :         OpenFiles {
    1120          188 :             next: AtomicUsize::new(0),
    1121          188 :             slots: Box::leak(slots),
    1122          188 :         }
    1123          188 :     }
    1124              : }
    1125              : 
    1126              : ///
    1127              : /// Initialize the virtual file module. This must be called once at page
    1128              : /// server startup.
    1129              : ///
    1130              : #[cfg(not(test))]
    1131            0 : pub fn init(num_slots: usize, engine: IoEngineKind) {
    1132            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1133            0 :         panic!("virtual_file::init called twice");
    1134            0 :     }
    1135            0 :     io_engine::init(engine);
    1136            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1137            0 : }
    1138              : 
    1139              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1140              : 
    1141              : // Get a handle to the global slots array.
    1142      1638116 : fn get_open_files() -> &'static OpenFiles {
    1143      1638116 :     //
    1144      1638116 :     // In unit tests, page server startup doesn't happen and no one calls
    1145      1638116 :     // virtual_file::init(). Initialize it here, with a small array.
    1146      1638116 :     //
    1147      1638116 :     // This applies to the virtual file tests below, but all other unit
    1148      1638116 :     // tests too, so the virtual file facility is always usable in
    1149      1638116 :     // unit tests.
    1150      1638116 :     //
    1151      1638116 :     if cfg!(test) {
    1152      1638116 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1153              :     } else {
    1154            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1155              :     }
    1156      1638116 : }
    1157              : 
    1158              : #[cfg(test)]
    1159              : mod tests {
    1160              :     use crate::context::DownloadBehavior;
    1161              :     use crate::task_mgr::TaskKind;
    1162              : 
    1163              :     use super::*;
    1164              :     use owned_buffers_io::io_buf_ext::IoBufExt;
    1165              :     use owned_buffers_io::slice::SliceMutExt;
    1166              :     use rand::seq::SliceRandom;
    1167              :     use rand::thread_rng;
    1168              :     use rand::Rng;
    1169              :     use std::io::Write;
    1170              :     use std::os::unix::fs::FileExt;
    1171              :     use std::sync::Arc;
    1172              : 
    1173              :     enum MaybeVirtualFile {
    1174              :         VirtualFile(VirtualFile),
    1175              :         File(File),
    1176              :     }
    1177              : 
    1178              :     impl From<VirtualFile> for MaybeVirtualFile {
    1179            6 :         fn from(vf: VirtualFile) -> Self {
    1180            6 :             MaybeVirtualFile::VirtualFile(vf)
    1181            6 :         }
    1182              :     }
    1183              : 
    1184              :     impl MaybeVirtualFile {
    1185          404 :         async fn read_exact_at(
    1186          404 :             &self,
    1187          404 :             mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
    1188          404 :             offset: u64,
    1189          404 :             ctx: &RequestContext,
    1190          404 :         ) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
    1191          404 :             match self {
    1192          203 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
    1193          202 :                 MaybeVirtualFile::File(file) => {
    1194          202 :                     let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
    1195          202 :                     file.read_exact_at(rust_slice, offset).map(|()| slice)
    1196              :                 }
    1197              :             }
    1198          404 :         }
    1199            8 :         async fn write_all_at<Buf: IoBuf + Send>(
    1200            8 :             &self,
    1201            8 :             buf: FullSlice<Buf>,
    1202            8 :             offset: u64,
    1203            8 :             ctx: &RequestContext,
    1204            8 :         ) -> Result<(), Error> {
    1205            8 :             match self {
    1206            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1207            4 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1208            4 :                     res
    1209              :                 }
    1210            4 :                 MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
    1211              :             }
    1212            8 :         }
    1213           36 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1214           36 :             match self {
    1215           18 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1216           18 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1217              :             }
    1218           36 :         }
    1219            8 :         async fn write_all<Buf: IoBuf + Send>(
    1220            8 :             &mut self,
    1221            8 :             buf: FullSlice<Buf>,
    1222            8 :             ctx: &RequestContext,
    1223            8 :         ) -> Result<(), Error> {
    1224            8 :             match self {
    1225            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1226            4 :                     let (_buf, res) = file.write_all(buf, ctx).await;
    1227            4 :                     res.map(|_| ())
    1228              :                 }
    1229            4 :                 MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
    1230              :             }
    1231            8 :         }
    1232              : 
    1233              :         // Helper function to slurp contents of a file, starting at the current position,
    1234              :         // into a string
    1235          442 :         async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
    1236          442 :             use std::io::Read;
    1237          442 :             let mut buf = String::new();
    1238          442 :             match self {
    1239          224 :                 MaybeVirtualFile::VirtualFile(file) => {
    1240          224 :                     let mut buf = Vec::new();
    1241          226 :                     file.read_to_end(&mut buf, ctx).await?;
    1242          222 :                     return Ok(String::from_utf8(buf).unwrap());
    1243              :                 }
    1244          218 :                 MaybeVirtualFile::File(file) => {
    1245          218 :                     file.read_to_string(&mut buf)?;
    1246              :                 }
    1247              :             }
    1248          216 :             Ok(buf)
    1249          442 :         }
    1250              : 
    1251              :         // Helper function to slurp a portion of a file into a string
    1252          404 :         async fn read_string_at(
    1253          404 :             &mut self,
    1254          404 :             pos: u64,
    1255          404 :             len: usize,
    1256          404 :             ctx: &RequestContext,
    1257          404 :         ) -> Result<String, Error> {
    1258          404 :             let slice = Vec::with_capacity(len).slice_full();
    1259          404 :             assert_eq!(slice.bytes_total(), len);
    1260          404 :             let slice = self.read_exact_at(slice, pos, ctx).await?;
    1261          404 :             let vec = slice.into_inner();
    1262          404 :             assert_eq!(vec.len(), len);
    1263          404 :             Ok(String::from_utf8(vec).unwrap())
    1264          404 :         }
    1265              :     }
    1266              : 
    1267              :     #[tokio::test]
    1268            2 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1269            2 :         // The real work is done in the test_files() helper function. This
    1270            2 :         // allows us to run the same set of tests against a native File, and
    1271            2 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1272            2 :         // but this allows us to verify that the operations return the same
    1273            2 :         // results with VirtualFiles as with native Files. (Except that with
    1274            2 :         // native files, you will run out of file descriptors if the ulimit
    1275            2 :         // is low enough.)
    1276            2 :         struct A;
    1277            2 : 
    1278            2 :         impl Adapter for A {
    1279          206 :             async fn open(
    1280          206 :                 path: Utf8PathBuf,
    1281          206 :                 opts: OpenOptions,
    1282          206 :                 ctx: &RequestContext,
    1283          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1284          206 :                 let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
    1285          206 :                 Ok(MaybeVirtualFile::VirtualFile(vf))
    1286          206 :             }
    1287            2 :         }
    1288          531 :         test_files::<A>("virtual_files").await
    1289            2 :     }
    1290              : 
    1291              :     #[tokio::test]
    1292            2 :     async fn test_physical_files() -> anyhow::Result<()> {
    1293            2 :         struct B;
    1294            2 : 
    1295            2 :         impl Adapter for B {
    1296          206 :             async fn open(
    1297          206 :                 path: Utf8PathBuf,
    1298          206 :                 opts: OpenOptions,
    1299          206 :                 _ctx: &RequestContext,
    1300          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1301          206 :                 Ok(MaybeVirtualFile::File({
    1302          206 :                     let owned_fd = opts.open(path.as_std_path()).await?;
    1303          206 :                     File::from(owned_fd)
    1304            2 :                 }))
    1305          206 :             }
    1306            2 :         }
    1307            2 : 
    1308          104 :         test_files::<B>("physical_files").await
    1309            2 :     }
    1310              : 
    1311              :     /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
    1312              :     /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
    1313              :     /// in trait which benefits from the new lifetime capture rules already.
    1314              :     trait Adapter {
    1315              :         async fn open(
    1316              :             path: Utf8PathBuf,
    1317              :             opts: OpenOptions,
    1318              :             ctx: &RequestContext,
    1319              :         ) -> Result<MaybeVirtualFile, anyhow::Error>;
    1320              :     }
    1321              : 
    1322            4 :     async fn test_files<A>(testname: &str) -> anyhow::Result<()>
    1323            4 :     where
    1324            4 :         A: Adapter,
    1325            4 :     {
    1326            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1327            4 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1328            4 :         std::fs::create_dir_all(&testdir)?;
    1329              : 
    1330            4 :         let path_a = testdir.join("file_a");
    1331            4 :         let mut file_a = A::open(
    1332            4 :             path_a.clone(),
    1333            4 :             OpenOptions::new()
    1334            4 :                 .write(true)
    1335            4 :                 .create(true)
    1336            4 :                 .truncate(true)
    1337            4 :                 .to_owned(),
    1338            4 :             &ctx,
    1339            4 :         )
    1340            4 :         .await?;
    1341            4 :         file_a
    1342            4 :             .write_all(b"foobar".to_vec().slice_len(), &ctx)
    1343            1 :             .await?;
    1344              : 
    1345              :         // cannot read from a file opened in write-only mode
    1346            4 :         let _ = file_a.read_string(&ctx).await.unwrap_err();
    1347              : 
    1348              :         // Close the file and re-open for reading
    1349            4 :         let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
    1350              : 
    1351              :         // cannot write to a file opened in read-only mode
    1352            4 :         let _ = file_a
    1353            4 :             .write_all(b"bar".to_vec().slice_len(), &ctx)
    1354            1 :             .await
    1355            4 :             .unwrap_err();
    1356            4 : 
    1357            4 :         // Try simple read
    1358            4 :         assert_eq!("foobar", file_a.read_string(&ctx).await?);
    1359              : 
    1360              :         // It's positioned at the EOF now.
    1361            4 :         assert_eq!("", file_a.read_string(&ctx).await?);
    1362              : 
    1363              :         // Test seeks.
    1364            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1365            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1366              : 
    1367            4 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1368            4 :         assert_eq!("ar", file_a.read_string(&ctx).await?);
    1369              : 
    1370            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1371            4 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1372            4 :         assert_eq!("bar", file_a.read_string(&ctx).await?);
    1373              : 
    1374            4 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1375            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1376              : 
    1377              :         // Test erroneous seeks to before byte 0
    1378            4 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1379            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1380            4 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1381            4 : 
    1382            4 :         // the erroneous seek should have left the position unchanged
    1383            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1384              : 
    1385              :         // Create another test file, and try FileExt functions on it.
    1386            4 :         let path_b = testdir.join("file_b");
    1387            4 :         let mut file_b = A::open(
    1388            4 :             path_b.clone(),
    1389            4 :             OpenOptions::new()
    1390            4 :                 .read(true)
    1391            4 :                 .write(true)
    1392            4 :                 .create(true)
    1393            4 :                 .truncate(true)
    1394            4 :                 .to_owned(),
    1395            4 :             &ctx,
    1396            4 :         )
    1397            2 :         .await?;
    1398            4 :         file_b
    1399            4 :             .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
    1400            1 :             .await?;
    1401            4 :         file_b
    1402            4 :             .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
    1403            1 :             .await?;
    1404              : 
    1405            4 :         assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
    1406              : 
    1407              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1408              :         // open the same file many times. The effect is the same.)
    1409              :         //
    1410              :         // leave file_a positioned at offset 1 before we start
    1411            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1412              : 
    1413            4 :         let mut vfiles = Vec::new();
    1414          404 :         for _ in 0..100 {
    1415          400 :             let mut vfile = A::open(
    1416          400 :                 path_b.clone(),
    1417          400 :                 OpenOptions::new().read(true).to_owned(),
    1418          400 :                 &ctx,
    1419          400 :             )
    1420          200 :             .await?;
    1421          400 :             assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
    1422          400 :             vfiles.push(vfile);
    1423              :         }
    1424              : 
    1425              :         // make sure we opened enough files to definitely cause evictions.
    1426            4 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1427              : 
    1428              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1429              :         // from it again. We left the file positioned at offset 1 above.
    1430            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1431              : 
    1432              :         // Check that all the other FDs still work too. Use them in random order for
    1433              :         // good measure.
    1434            4 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1435          400 :         for vfile in vfiles.iter_mut() {
    1436          400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
    1437              :         }
    1438              : 
    1439            4 :         Ok(())
    1440            4 :     }
    1441              : 
    1442              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1443              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1444              :     /// VirtualFile from multiple threads concurrently.
    1445              :     #[tokio::test]
    1446            2 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1447            2 :         const SIZE: usize = 8 * 1024;
    1448            2 :         const VIRTUAL_FILES: usize = 100;
    1449            2 :         const THREADS: usize = 100;
    1450            2 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1451            2 : 
    1452            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1453            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1454            2 :         std::fs::create_dir_all(&testdir)?;
    1455            2 : 
    1456            2 :         // Create a test file.
    1457            2 :         let test_file_path = testdir.join("concurrency_test_file");
    1458            2 :         {
    1459            2 :             let file = File::create(&test_file_path)?;
    1460            2 :             file.write_all_at(&SAMPLE, 0)?;
    1461            2 :         }
    1462            2 : 
    1463            2 :         // Open the file many times.
    1464            2 :         let mut files = Vec::new();
    1465          202 :         for _ in 0..VIRTUAL_FILES {
    1466          200 :             let f = VirtualFile::open_with_options(
    1467          200 :                 &test_file_path,
    1468          200 :                 OpenOptions::new().read(true),
    1469          200 :                 &ctx,
    1470          200 :             )
    1471          101 :             .await?;
    1472          200 :             files.push(f);
    1473            2 :         }
    1474            2 :         let files = Arc::new(files);
    1475            2 : 
    1476            2 :         // Launch many threads, and use the virtual files concurrently in random order.
    1477            2 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1478            2 :             .worker_threads(THREADS)
    1479            2 :             .thread_name("test_vfile_concurrency thread")
    1480            2 :             .build()
    1481            2 :             .unwrap();
    1482            2 :         let mut hdls = Vec::new();
    1483          202 :         for _threadno in 0..THREADS {
    1484          200 :             let files = files.clone();
    1485          200 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1486          200 :             let hdl = rt.spawn(async move {
    1487          200 :                 let mut buf = vec![0u8; SIZE];
    1488          200 :                 let mut rng = rand::rngs::OsRng;
    1489       200000 :                 for _ in 1..1000 {
    1490       199800 :                     let f = &files[rng.gen_range(0..files.len())];
    1491       199800 :                     buf = f
    1492       199800 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1493       545251 :                         .await
    1494       199800 :                         .unwrap()
    1495       199800 :                         .into_inner();
    1496       199800 :                     assert!(buf == SAMPLE);
    1497            2 :                 }
    1498          200 :             });
    1499          200 :             hdls.push(hdl);
    1500          200 :         }
    1501          202 :         for hdl in hdls {
    1502          200 :             hdl.await?;
    1503            2 :         }
    1504            2 :         std::mem::forget(rt);
    1505            2 : 
    1506            2 :         Ok(())
    1507            2 :     }
    1508              : 
    1509              :     #[tokio::test]
    1510            2 :     async fn test_atomic_overwrite_basic() {
    1511            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1512            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1513            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1514            2 : 
    1515            2 :         let path = testdir.join("myfile");
    1516            2 :         let tmp_path = testdir.join("myfile.tmp");
    1517            2 : 
    1518            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1519            2 :             .await
    1520            2 :             .unwrap();
    1521            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1522            2 :         let post = file.read_string(&ctx).await.unwrap();
    1523            2 :         assert_eq!(post, "foo");
    1524            2 :         assert!(!tmp_path.exists());
    1525            2 :         drop(file);
    1526            2 : 
    1527            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1528            2 :             .await
    1529            2 :             .unwrap();
    1530            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1531            2 :         let post = file.read_string(&ctx).await.unwrap();
    1532            2 :         assert_eq!(post, "bar");
    1533            2 :         assert!(!tmp_path.exists());
    1534            2 :         drop(file);
    1535            2 :     }
    1536              : 
    1537              :     #[tokio::test]
    1538            2 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1539            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1540            2 :         let testdir =
    1541            2 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1542            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1543            2 : 
    1544            2 :         let path = testdir.join("myfile");
    1545            2 :         let tmp_path = testdir.join("myfile.tmp");
    1546            2 : 
    1547            2 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1548            2 :         assert!(tmp_path.exists());
    1549            2 : 
    1550            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1551            2 :             .await
    1552            2 :             .unwrap();
    1553            2 : 
    1554            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1555            2 :         let post = file.read_string(&ctx).await.unwrap();
    1556            2 :         assert_eq!(post, "foo");
    1557            2 :         assert!(!tmp_path.exists());
    1558            2 :         drop(file);
    1559            2 :     }
    1560              : }
        

Generated by: LCOV version 2.1-beta