LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 93.0 % 976 908
Test Date: 2024-07-03 15:33:13 Functions: 90.5 % 199 180

            Line data    Source code
       1              : //!
       2              : //! VirtualFile is like a normal File, but it's not bound directly to
       3              : //! a file descriptor. Instead, the file is opened when it's read from,
       4              : //! and if too many files are open globally in the system, least-recently
       5              : //! used ones are closed.
       6              : //!
       7              : //! To track which files have been recently used, we use the clock algorithm
       8              : //! with a 'recently_used' flag on each slot.
       9              : //!
      10              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11              : //! src/backend/storage/file/fd.c
      12              : //!
      13              : use crate::context::RequestContext;
      14              : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      15              : 
      16              : use crate::page_cache::{PageWriteGuard, PAGE_SZ};
      17              : use crate::tenant::TENANTS_SEGMENT_NAME;
      18              : use camino::{Utf8Path, Utf8PathBuf};
      19              : use once_cell::sync::OnceCell;
      20              : use pageserver_api::shard::TenantShardId;
      21              : use std::fs::File;
      22              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      23              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      24              : 
      25              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      26              : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      27              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      28              : use tokio::time::Instant;
      29              : 
      30              : pub use pageserver_api::models::virtual_file as api;
      31              : pub(crate) mod io_engine;
      32              : pub use io_engine::feature_test as io_engine_feature_test;
      33              : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
      34              : mod metadata;
      35              : mod open_options;
      36              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      37              : pub(crate) use io_engine::IoEngineKind;
      38              : pub(crate) use metadata::Metadata;
      39              : pub(crate) use open_options::*;
      40              : 
      41              : pub(crate) mod owned_buffers_io {
      42              :     //! Abstractions for IO with owned buffers.
      43              :     //!
      44              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      45              :     //! reason we need this abstraction.
      46              :     //!
      47              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      48              :     //! but for the time being we're proving out the primitives in the neon.git repo
      49              :     //! for faster iteration.
      50              : 
      51              :     pub(crate) mod slice;
      52              :     pub(crate) mod write;
      53              :     pub(crate) mod util {
      54              :         pub(crate) mod size_tracking_writer;
      55              :     }
      56              : }
      57              : 
      58              : ///
      59              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      60              : /// the underlying file is closed if the system is low on file descriptors,
      61              : /// and re-opened when it's accessed again.
      62              : ///
      63              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      64              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      65              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      66              : /// require a mutable reference, because they modify the "current position".
      67              : ///
      68              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      69              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      70              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      71              : /// 'tag' field is used to detect whether the handle still is valid or not.
      72              : ///
      73              : #[derive(Debug)]
      74              : pub struct VirtualFile {
      75              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      76              :     /// might contain our File, or it may be empty, or it may contain a File that
      77              :     /// belongs to a different VirtualFile.
      78              :     handle: RwLock<SlotHandle>,
      79              : 
      80              :     /// Current file position
      81              :     pos: u64,
      82              : 
      83              :     /// File path and options to use to open it.
      84              :     ///
      85              :     /// Note: this only contains the options needed to re-open it. For example,
      86              :     /// if a new file is created, we only pass the create flag when it's initially
      87              :     /// opened, in the VirtualFile::create() function, and strip the flag before
      88              :     /// storing it here.
      89              :     pub path: Utf8PathBuf,
      90              :     open_options: OpenOptions,
      91              : 
      92              :     // These are strings becase we only use them for metrics, and those expect strings.
      93              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      94              :     // strings.
      95              :     tenant_id: String,
      96              :     shard_id: String,
      97              :     timeline_id: String,
      98              : }
      99              : 
     100              : #[derive(Debug, PartialEq, Clone, Copy)]
     101              : struct SlotHandle {
     102              :     /// Index into OPEN_FILES.slots
     103              :     index: usize,
     104              : 
     105              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     106              :     /// been recycled and no longer contains the FD for this virtual file.
     107              :     tag: u64,
     108              : }
     109              : 
     110              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     111              : /// are currently open. Each slot in the array is protected by a separate lock,
     112              : /// so that different files can be accessed independently. The lock must be held
     113              : /// in write mode to replace the slot with a different file, but a read mode
     114              : /// is enough to operate on the file, whether you're reading or writing to it.
     115              : ///
     116              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     117              : /// the virtual_file::init() function. It must be called exactly once at page
     118              : /// server startup.
     119              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     120              : 
     121              : struct OpenFiles {
     122              :     slots: &'static [Slot],
     123              : 
     124              :     /// clock arm for the clock algorithm
     125              :     next: AtomicUsize,
     126              : }
     127              : 
     128              : struct Slot {
     129              :     inner: RwLock<SlotInner>,
     130              : 
     131              :     /// has this file been used since last clock sweep?
     132              :     recently_used: AtomicBool,
     133              : }
     134              : 
     135              : struct SlotInner {
     136              :     /// Counter that's incremented every time a different file is stored here.
     137              :     /// To avoid the ABA problem.
     138              :     tag: u64,
     139              : 
     140              :     /// the underlying file
     141              :     file: Option<OwnedFd>,
     142              : }
     143              : 
     144              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     145              : struct PageWriteGuardBuf {
     146              :     page: PageWriteGuard<'static>,
     147              : }
     148              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     149              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     150              : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
     151              : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
     152              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     153       456199 :     fn stable_ptr(&self) -> *const u8 {
     154       456199 :         self.page.as_ptr()
     155       456199 :     }
     156       855267 :     fn bytes_init(&self) -> usize {
     157       855267 :         self.page.len()
     158       855267 :     }
     159       341937 :     fn bytes_total(&self) -> usize {
     160       341937 :         self.page.len()
     161       341937 :     }
     162              : }
     163              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     164              : // hence it's safe to hand out the `stable_mut_ptr()`.
     165              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     166       171110 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     167       171110 :         self.page.as_mut_ptr()
     168       171110 :     }
     169              : 
     170       113979 :     unsafe fn set_init(&mut self, pos: usize) {
     171       113979 :         // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
     172       113979 :         assert!(pos <= self.page.len());
     173       113979 :     }
     174              : }
     175              : 
     176              : impl OpenFiles {
     177              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     178              :     ///
     179              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     180              :     /// recently_used has been set. It's all ready for reuse.
     181       195127 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     182       195127 :         //
     183       195127 :         // Run the clock algorithm to find a slot to replace.
     184       195127 :         //
     185       195127 :         let num_slots = self.slots.len();
     186       195127 :         let mut retries = 0;
     187              :         let mut slot;
     188              :         let mut slot_guard;
     189              :         let index;
     190      2552409 :         loop {
     191      2552409 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     192      2552409 :             slot = &self.slots[next];
     193      2552409 : 
     194      2552409 :             // If the recently_used flag on this slot is set, continue the clock
     195      2552409 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     196      2552409 :             // lock, also continue the clock sweep.
     197      2552409 :             //
     198      2552409 :             // We only continue in this manner for a while, though. If we loop
     199      2552409 :             // through the array twice without finding a victim, just pick the
     200      2552409 :             // next slot and wait until we can reuse it. This way, we avoid
     201      2552409 :             // spinning in the extreme case that all the slots are busy with an
     202      2552409 :             // I/O operation.
     203      2552409 :             if retries < num_slots * 2 {
     204      2454468 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     205      2245690 :                     if let Ok(guard) = slot.inner.try_write() {
     206        97186 :                         slot_guard = guard;
     207        97186 :                         index = next;
     208        97186 :                         break;
     209      2148504 :                     }
     210       208778 :                 }
     211      2357282 :                 retries += 1;
     212              :             } else {
     213        97941 :                 slot_guard = slot.inner.write().await;
     214        97941 :                 index = next;
     215        97941 :                 break;
     216              :             }
     217              :         }
     218              : 
     219              :         //
     220              :         // We now have the victim slot locked. If it was in use previously, close the
     221              :         // old file.
     222              :         //
     223       195127 :         if let Some(old_file) = slot_guard.file.take() {
     224       190711 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     225       190711 :             // distinguish the two.
     226       190711 :             STORAGE_IO_TIME_METRIC
     227       190711 :                 .get(StorageIoOperation::CloseByReplace)
     228       190711 :                 .observe_closure_duration(|| drop(old_file));
     229       190711 :         }
     230              : 
     231              :         // Prepare the slot for reuse and return it
     232       195127 :         slot_guard.tag += 1;
     233       195127 :         slot.recently_used.store(true, Ordering::Relaxed);
     234       195127 :         (
     235       195127 :             SlotHandle {
     236       195127 :                 index,
     237       195127 :                 tag: slot_guard.tag,
     238       195127 :             },
     239       195127 :             slot_guard,
     240       195127 :         )
     241       195127 :     }
     242              : }
     243              : 
     244              : /// Identify error types that should alwways terminate the process.  Other
     245              : /// error types may be elegible for retry.
     246            0 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     247            0 :     use nix::errno::Errno::*;
     248            0 :     match e.raw_os_error().map(nix::errno::from_i32) {
     249              :         Some(EIO) => {
     250              :             // Terminate on EIO because we no longer trust the device to store
     251              :             // data safely, or to uphold persistence guarantees on fsync.
     252            0 :             true
     253              :         }
     254              :         Some(EROFS) => {
     255              :             // Terminate on EROFS because a filesystem is usually remounted
     256              :             // readonly when it has experienced some critical issue, so the same
     257              :             // logic as EIO applies.
     258            0 :             true
     259              :         }
     260              :         Some(EACCES) => {
     261              :             // Terminate on EACCESS because we should always have permissions
     262              :             // for our own data dir: if we don't, then we can't do our job and
     263              :             // need administrative intervention to fix permissions.  Terminating
     264              :             // is the best way to make sure we stop cleanly rather than going
     265              :             // into infinite retry loops, and will make it clear to the outside
     266              :             // world that we need help.
     267            0 :             true
     268              :         }
     269              :         _ => {
     270              :             // Treat all other local file I/O errors are retryable.  This includes:
     271              :             // - ENOSPC: we stay up and wait for eviction to free some space
     272              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     273              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     274            0 :             false
     275              :         }
     276              :     }
     277            0 : }
     278              : 
     279              : /// Call this when the local filesystem gives us an error with an external
     280              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     281              : /// bad storage or bad configuration, and we can't fix that from inside
     282              : /// a running process.
     283            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     284            0 :     tracing::error!("Fatal I/O error: {e}: {context})");
     285            0 :     std::process::abort();
     286              : }
     287              : 
     288              : pub(crate) trait MaybeFatalIo<T> {
     289              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     290              :     fn fatal_err(self, context: &str) -> T;
     291              : }
     292              : 
     293              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     294              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     295              :     ///
     296              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     297              :     /// not on ENOSPC.
     298           22 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     299           22 :         if let Err(e) = &self {
     300            0 :             if is_fatal_io_error(e) {
     301            0 :                 on_fatal_io_error(e, context);
     302            0 :             }
     303           22 :         }
     304           22 :         self
     305           22 :     }
     306              : 
     307              :     /// Terminate the process on any I/O error.
     308              :     ///
     309              :     /// This is appropriate for reads on files that we know exist: they should always work.
     310         2358 :     fn fatal_err(self, context: &str) -> T {
     311         2358 :         match self {
     312         2358 :             Ok(v) => v,
     313            0 :             Err(e) => {
     314            0 :                 on_fatal_io_error(&e, context);
     315              :             }
     316              :         }
     317         2358 :     }
     318              : }
     319              : 
     320              : /// Observe duration for the given storage I/O operation
     321              : ///
     322              : /// Unlike `observe_closure_duration`, this supports async,
     323              : /// where "support" means that we measure wall clock time.
     324              : macro_rules! observe_duration {
     325              :     ($op:expr, $($body:tt)*) => {{
     326              :         let instant = Instant::now();
     327              :         let result = $($body)*;
     328              :         let elapsed = instant.elapsed().as_secs_f64();
     329              :         STORAGE_IO_TIME_METRIC
     330              :             .get($op)
     331              :             .observe(elapsed);
     332              :         result
     333              :     }}
     334              : }
     335              : 
     336              : macro_rules! with_file {
     337              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     338              :         let $ident = $this.lock_file().await?;
     339              :         observe_duration!($op, $($body)*)
     340              :     }};
     341              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     342              :         let mut $ident = $this.lock_file().await?;
     343              :         observe_duration!($op, $($body)*)
     344              :     }};
     345              : }
     346              : 
     347              : impl VirtualFile {
     348              :     /// Open a file in read-only mode. Like File::open.
     349         2286 :     pub async fn open<P: AsRef<Utf8Path>>(
     350         2286 :         path: P,
     351         2286 :         ctx: &RequestContext,
     352         2286 :     ) -> Result<VirtualFile, std::io::Error> {
     353         2286 :         Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
     354         2286 :     }
     355              : 
     356              :     /// Create a new file for writing. If the file exists, it will be truncated.
     357              :     /// Like File::create.
     358         1359 :     pub async fn create<P: AsRef<Utf8Path>>(
     359         1359 :         path: P,
     360         1359 :         ctx: &RequestContext,
     361         1359 :     ) -> Result<VirtualFile, std::io::Error> {
     362         1359 :         Self::open_with_options(
     363         1359 :             path.as_ref(),
     364         1359 :             OpenOptions::new().write(true).create(true).truncate(true),
     365         1359 :             ctx,
     366         1359 :         )
     367          699 :         .await
     368         1359 :     }
     369              : 
     370              :     /// Open a file with given options.
     371              :     ///
     372              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     373              :     /// they will be applied also when the file is subsequently re-opened, not only
     374              :     /// on the first time. Make sure that's sane!
     375         5541 :     pub async fn open_with_options<P: AsRef<Utf8Path>>(
     376         5541 :         path: P,
     377         5541 :         open_options: &OpenOptions,
     378         5541 :         _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
     379         5541 :     ) -> Result<VirtualFile, std::io::Error> {
     380         5541 :         let path_ref = path.as_ref();
     381         5541 :         let path_str = path_ref.to_string();
     382         5541 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     383         5541 :         let (tenant_id, shard_id, timeline_id) =
     384         5541 :             if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     385         3911 :                 let tenant_shard_part = parts[parts.len() - 4];
     386         3911 :                 let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
     387         3911 :                     Ok(tenant_shard_id) => (
     388         3911 :                         tenant_shard_id.tenant_id.to_string(),
     389         3911 :                         format!("{}", tenant_shard_id.shard_slug()),
     390         3911 :                     ),
     391              :                     Err(_) => {
     392              :                         // Malformed path: this ID is just for observability, so tolerate it
     393              :                         // and pass through
     394            0 :                         (tenant_shard_part.to_string(), "*".to_string())
     395              :                     }
     396              :                 };
     397         3911 :                 (tenant_id, shard_id, parts[parts.len() - 2].to_string())
     398              :             } else {
     399         1630 :                 ("*".to_string(), "*".to_string(), "*".to_string())
     400              :             };
     401         5541 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     402              : 
     403              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     404              :         // where our caller doesn't get to use the returned VirtualFile before its
     405              :         // slot gets re-used by someone else.
     406         5541 :         let file = observe_duration!(StorageIoOperation::Open, {
     407         5541 :             open_options.open(path_ref.as_std_path()).await?
     408              :         });
     409              : 
     410              :         // Strip all options other than read and write.
     411              :         //
     412              :         // It would perhaps be nicer to check just for the read and write flags
     413              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     414              :         // only to set them.
     415         5541 :         let mut reopen_options = open_options.clone();
     416         5541 :         reopen_options.create(false);
     417         5541 :         reopen_options.create_new(false);
     418         5541 :         reopen_options.truncate(false);
     419         5541 : 
     420         5541 :         let vfile = VirtualFile {
     421         5541 :             handle: RwLock::new(handle),
     422         5541 :             pos: 0,
     423         5541 :             path: path_ref.to_path_buf(),
     424         5541 :             open_options: reopen_options,
     425         5541 :             tenant_id,
     426         5541 :             shard_id,
     427         5541 :             timeline_id,
     428         5541 :         };
     429         5541 : 
     430         5541 :         // TODO: Under pressure, it's likely the slot will get re-used and
     431         5541 :         // the underlying file closed before they get around to using it.
     432         5541 :         // => https://github.com/neondatabase/neon/issues/6065
     433         5541 :         slot_guard.file.replace(file);
     434         5541 : 
     435         5541 :         Ok(vfile)
     436         5541 :     }
     437              : 
     438              :     /// Async version of [`::utils::crashsafe::overwrite`].
     439              :     ///
     440              :     /// # NB:
     441              :     ///
     442              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     443              :     /// it did at an earlier time.
     444              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     445           28 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     446           28 :         final_path: Utf8PathBuf,
     447           28 :         tmp_path: Utf8PathBuf,
     448           28 :         content: B,
     449           28 :     ) -> std::io::Result<()> {
     450           28 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     451           28 :         // See https://github.com/neondatabase/neon/issues/6663
     452           28 : 
     453           28 :         tokio::task::spawn_blocking(move || {
     454           28 :             let slice_storage;
     455           28 :             let content_len = content.bytes_init();
     456           28 :             let content = if content.bytes_init() > 0 {
     457           28 :                 slice_storage = Some(content.slice(0..content_len));
     458           28 :                 slice_storage.as_deref().expect("just set it to Some()")
     459              :             } else {
     460            0 :                 &[]
     461              :             };
     462           28 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     463           28 :         })
     464           28 :         .await
     465           28 :         .expect("blocking task is never aborted")
     466           28 :     }
     467              : 
     468              :     /// Call File::sync_all() on the underlying File.
     469         2731 :     pub async fn sync_all(&self) -> Result<(), Error> {
     470         2731 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     471         2731 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     472         2731 :             res
     473              :         })
     474         2731 :     }
     475              : 
     476              :     /// Call File::sync_data() on the underlying File.
     477            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     478            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     479            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     480            0 :             res
     481              :         })
     482            0 :     }
     483              : 
     484         1568 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     485         1568 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     486         1568 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     487         1568 :             res
     488              :         })
     489         1568 :     }
     490              : 
     491              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     492              :     /// opens it and evicts some other File if necessary. The passed parameter is
     493              :     /// assumed to be a function available for the physical `File`.
     494              :     ///
     495              :     /// We are doing it via a macro as Rust doesn't support async closures that
     496              :     /// take on parameters with lifetimes.
     497      1532783 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     498      1532783 :         let open_files = get_open_files();
     499              : 
     500       189586 :         let mut handle_guard = {
     501              :             // Read the cached slot handle, and see if the slot that it points to still
     502              :             // contains our File.
     503              :             //
     504              :             // We only need to hold the handle lock while we read the current handle. If
     505              :             // another thread closes the file and recycles the slot for a different file,
     506              :             // we will notice that the handle we read is no longer valid and retry.
     507      1532783 :             let mut handle = *self.handle.read().await;
     508      1637209 :             loop {
     509      1637209 :                 // Check if the slot contains our File
     510      1637209 :                 {
     511      1637209 :                     let slot = &open_files.slots[handle.index];
     512      1637209 :                     let slot_guard = slot.inner.read().await;
     513      1637209 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     514              :                         // Found a cached file descriptor.
     515      1343197 :                         slot.recently_used.store(true, Ordering::Relaxed);
     516      1343197 :                         return Ok(FileGuard { slot_guard });
     517       294012 :                     }
     518              :                 }
     519              : 
     520              :                 // The slot didn't contain our File. We will have to open it ourselves,
     521              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     522              :                 // that no other thread will try to concurrently open the same file.
     523       294012 :                 let handle_guard = self.handle.write().await;
     524              : 
     525              :                 // If another thread changed the handle while we were not holding the lock,
     526              :                 // then the handle might now be valid again. Loop back to retry.
     527       294012 :                 if *handle_guard != handle {
     528       104426 :                     handle = *handle_guard;
     529       104426 :                     continue;
     530       189586 :                 }
     531       189586 :                 break handle_guard;
     532              :             }
     533              :         };
     534              : 
     535              :         // We need to open the file ourselves. The handle in the VirtualFile is
     536              :         // now locked in write-mode. Find a free slot to put it in.
     537       189586 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     538              : 
     539              :         // Re-open the physical file.
     540              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     541              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     542              :         // of the virtual file descriptor cache.
     543       189586 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     544       189586 :             self.open_options.open(self.path.as_std_path()).await?
     545              :         });
     546              : 
     547              :         // Store the File in the slot and update the handle in the VirtualFile
     548              :         // to point to it.
     549       189586 :         slot_guard.file.replace(file);
     550       189586 : 
     551       189586 :         *handle_guard = handle;
     552       189586 : 
     553       189586 :         return Ok(FileGuard {
     554       189586 :             slot_guard: slot_guard.downgrade(),
     555       189586 :         });
     556      1532783 :     }
     557              : 
     558            6 :     pub fn remove(self) {
     559            6 :         let path = self.path.clone();
     560            6 :         drop(self);
     561            6 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     562            6 :     }
     563              : 
     564         4722 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     565         4722 :         match pos {
     566         4712 :             SeekFrom::Start(offset) => {
     567         4712 :                 self.pos = offset;
     568         4712 :             }
     569            4 :             SeekFrom::End(offset) => {
     570            4 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     571            4 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     572              :             }
     573            6 :             SeekFrom::Current(offset) => {
     574            6 :                 let pos = self.pos as i128 + offset as i128;
     575            6 :                 if pos < 0 {
     576            2 :                     return Err(Error::new(
     577            2 :                         ErrorKind::InvalidInput,
     578            2 :                         "offset would be negative",
     579            2 :                     ));
     580            4 :                 }
     581            4 :                 if pos > u64::MAX as i128 {
     582            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     583            4 :                 }
     584            4 :                 self.pos = pos as u64;
     585              :             }
     586              :         }
     587         4718 :         Ok(self.pos)
     588         4722 :     }
     589              : 
     590              :     /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
     591              :     ///
     592              :     /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
     593       411540 :     pub async fn read_exact_at<Buf>(
     594       411540 :         &self,
     595       411540 :         slice: Slice<Buf>,
     596       411540 :         offset: u64,
     597       411540 :         ctx: &RequestContext,
     598       411540 :     ) -> Result<Slice<Buf>, Error>
     599       411540 :     where
     600       411540 :         Buf: IoBufMut + Send,
     601       411540 :     {
     602       411540 :         let assert_we_return_original_bounds = if cfg!(debug_assertions) {
     603       411540 :             Some((slice.stable_ptr() as usize, slice.bytes_total()))
     604              :         } else {
     605            0 :             None
     606              :         };
     607              : 
     608       411540 :         let original_bounds = slice.bounds();
     609       411540 :         let (buf, res) =
     610       746421 :             read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
     611       411540 :         let res = res.map(|_| buf.slice(original_bounds));
     612              : 
     613       411540 :         if let Some(original_bounds) = assert_we_return_original_bounds {
     614       411540 :             if let Ok(slice) = &res {
     615       411540 :                 let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
     616       411540 :                 assert_eq!(original_bounds, returned_bounds);
     617            0 :             }
     618            0 :         }
     619              : 
     620       411540 :         res
     621       411540 :     }
     622              : 
     623              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     624       113979 :     pub async fn read_exact_at_page(
     625       113979 :         &self,
     626       113979 :         page: PageWriteGuard<'static>,
     627       113979 :         offset: u64,
     628       113979 :         ctx: &RequestContext,
     629       113979 :     ) -> Result<PageWriteGuard<'static>, Error> {
     630       113979 :         let buf = PageWriteGuardBuf { page }.slice_full();
     631       113979 :         debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
     632       113979 :         self.read_exact_at(buf, offset, ctx)
     633        64358 :             .await
     634       113979 :             .map(|slice| slice.into_inner().page)
     635       113979 :     }
     636              : 
     637              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     638            4 :     pub async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     639            4 :         &self,
     640            4 :         buf: B,
     641            4 :         mut offset: u64,
     642            4 :         ctx: &RequestContext,
     643            4 :     ) -> (B::Buf, Result<(), Error>) {
     644            4 :         let buf_len = buf.bytes_init();
     645            4 :         if buf_len == 0 {
     646            0 :             return (Slice::into_inner(buf.slice_full()), Ok(()));
     647            4 :         }
     648            4 :         let mut buf = buf.slice(0..buf_len);
     649            8 :         while !buf.is_empty() {
     650              :             let res;
     651            4 :             (buf, res) = self.write_at(buf, offset, ctx).await;
     652            0 :             match res {
     653              :                 Ok(0) => {
     654            0 :                     return (
     655            0 :                         Slice::into_inner(buf),
     656            0 :                         Err(Error::new(
     657            0 :                             std::io::ErrorKind::WriteZero,
     658            0 :                             "failed to write whole buffer",
     659            0 :                         )),
     660            0 :                     );
     661              :                 }
     662            4 :                 Ok(n) => {
     663            4 :                     buf = buf.slice(n..);
     664            4 :                     offset += n as u64;
     665            4 :                 }
     666            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     667            0 :                 Err(e) => return (Slice::into_inner(buf), Err(e)),
     668              :             }
     669              :         }
     670            4 :         (Slice::into_inner(buf), Ok(()))
     671            4 :     }
     672              : 
     673              :     /// Writes `buf.slice(0..buf.bytes_init())`.
     674              :     /// Returns the IoBuf that is underlying the BoundedBuf `buf`.
     675              :     /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
     676              :     /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
     677      1116526 :     pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     678      1116526 :         &mut self,
     679      1116526 :         buf: B,
     680      1116526 :         ctx: &RequestContext,
     681      1116526 :     ) -> (B::Buf, Result<usize, Error>) {
     682      1116526 :         let nbytes = buf.bytes_init();
     683      1116526 :         if nbytes == 0 {
     684           34 :             return (Slice::into_inner(buf.slice_full()), Ok(0));
     685      1116492 :         }
     686      1116492 :         let mut buf = buf.slice(0..nbytes);
     687      2232982 :         while !buf.is_empty() {
     688              :             let res;
     689      1116492 :             (buf, res) = self.write(buf, ctx).await;
     690            2 :             match res {
     691              :                 Ok(0) => {
     692            0 :                     return (
     693            0 :                         Slice::into_inner(buf),
     694            0 :                         Err(Error::new(
     695            0 :                             std::io::ErrorKind::WriteZero,
     696            0 :                             "failed to write whole buffer",
     697            0 :                         )),
     698            0 :                     );
     699              :                 }
     700      1116490 :                 Ok(n) => {
     701      1116490 :                     buf = buf.slice(n..);
     702      1116490 :                 }
     703            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     704            2 :                 Err(e) => return (Slice::into_inner(buf), Err(e)),
     705              :             }
     706              :         }
     707      1116490 :         (Slice::into_inner(buf), Ok(nbytes))
     708      1116526 :     }
     709              : 
     710      1116492 :     async fn write<B: IoBuf + Send>(
     711      1116492 :         &mut self,
     712      1116492 :         buf: Slice<B>,
     713      1116492 :         ctx: &RequestContext,
     714      1116492 :     ) -> (Slice<B>, Result<usize, std::io::Error>) {
     715      1116492 :         let pos = self.pos;
     716      1116492 :         let (buf, res) = self.write_at(buf, pos, ctx).await;
     717      1116492 :         let n = match res {
     718      1116490 :             Ok(n) => n,
     719            2 :             Err(e) => return (buf, Err(e)),
     720              :         };
     721      1116490 :         self.pos += n as u64;
     722      1116490 :         (buf, Ok(n))
     723      1116492 :     }
     724              : 
     725       411984 :     pub(crate) async fn read_at<Buf>(
     726       411984 :         &self,
     727       411984 :         buf: tokio_epoll_uring::Slice<Buf>,
     728       411984 :         offset: u64,
     729       411984 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     730       411984 :     ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
     731       411984 :     where
     732       411984 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     733       411984 :     {
     734       540741 :         let file_guard = match self.lock_file().await {
     735       411984 :             Ok(file_guard) => file_guard,
     736            0 :             Err(e) => return (buf, Err(e)),
     737              :         };
     738              : 
     739       411984 :         observe_duration!(StorageIoOperation::Read, {
     740       411984 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     741       411984 :             if let Ok(size) = res {
     742       411982 :                 STORAGE_IO_SIZE
     743       411982 :                     .with_label_values(&[
     744       411982 :                         "read",
     745       411982 :                         &self.tenant_id,
     746       411982 :                         &self.shard_id,
     747       411982 :                         &self.timeline_id,
     748       411982 :                     ])
     749       411982 :                     .add(size as i64);
     750       411982 :             }
     751       411984 :             (buf, res)
     752              :         })
     753       411984 :     }
     754              : 
     755      1116496 :     async fn write_at<B: IoBuf + Send>(
     756      1116496 :         &self,
     757      1116496 :         buf: Slice<B>,
     758      1116496 :         offset: u64,
     759      1116496 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     760      1116496 :     ) -> (Slice<B>, Result<usize, Error>) {
     761      1116496 :         let file_guard = match self.lock_file().await {
     762      1116496 :             Ok(file_guard) => file_guard,
     763            0 :             Err(e) => return (buf, Err(e)),
     764              :         };
     765      1116496 :         observe_duration!(StorageIoOperation::Write, {
     766      1116496 :             let ((_file_guard, buf), result) =
     767      1116496 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     768      1116496 :             if let Ok(size) = result {
     769      1116494 :                 STORAGE_IO_SIZE
     770      1116494 :                     .with_label_values(&[
     771      1116494 :                         "write",
     772      1116494 :                         &self.tenant_id,
     773      1116494 :                         &self.shard_id,
     774      1116494 :                         &self.timeline_id,
     775      1116494 :                     ])
     776      1116494 :                     .add(size as i64);
     777      1116494 :             }
     778      1116496 :             (buf, result)
     779              :         })
     780      1116496 :     }
     781              : }
     782              : 
     783              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     784       411548 : pub async fn read_exact_at_impl<Buf, F, Fut>(
     785       411548 :     mut buf: tokio_epoll_uring::Slice<Buf>,
     786       411548 :     mut offset: u64,
     787       411548 :     mut read_at: F,
     788       411548 : ) -> (Buf, std::io::Result<()>)
     789       411548 : where
     790       411548 :     Buf: IoBufMut + Send,
     791       411548 :     F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
     792       411548 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
     793       411548 : {
     794       823098 :     while buf.bytes_total() != 0 {
     795              :         let res;
     796       746421 :         (buf, res) = read_at(buf, offset).await;
     797            0 :         match res {
     798            2 :             Ok(0) => break,
     799       411550 :             Ok(n) => {
     800       411550 :                 buf = buf.slice(n..);
     801       411550 :                 offset += n as u64;
     802       411550 :             }
     803            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     804            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     805              :         }
     806              :     }
     807              :     // NB: don't use `buf.is_empty()` here; it is from the
     808              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     809              :     // returned by it only covers the initialized portion of `buf`.
     810              :     // Whereas we're interested in ensuring that we filled the entire
     811              :     // buffer that the user passed in.
     812       411548 :     if buf.bytes_total() != 0 {
     813            2 :         (
     814            2 :             buf.into_inner(),
     815            2 :             Err(std::io::Error::new(
     816            2 :                 std::io::ErrorKind::UnexpectedEof,
     817            2 :                 "failed to fill whole buffer",
     818            2 :             )),
     819            2 :         )
     820              :     } else {
     821       411546 :         assert_eq!(buf.len(), buf.bytes_total());
     822       411546 :         (buf.into_inner(), Ok(()))
     823              :     }
     824       411548 : }
     825              : 
     826              : #[cfg(test)]
     827              : mod test_read_exact_at_impl {
     828              : 
     829              :     use std::{collections::VecDeque, sync::Arc};
     830              : 
     831              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     832              : 
     833              :     use super::read_exact_at_impl;
     834              : 
     835              :     struct Expectation {
     836              :         offset: u64,
     837              :         bytes_total: usize,
     838              :         result: std::io::Result<Vec<u8>>,
     839              :     }
     840              :     struct MockReadAt {
     841              :         expectations: VecDeque<Expectation>,
     842              :     }
     843              : 
     844              :     impl MockReadAt {
     845           12 :         async fn read_at(
     846           12 :             &mut self,
     847           12 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     848           12 :             offset: u64,
     849           12 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     850           12 :             let exp = self
     851           12 :                 .expectations
     852           12 :                 .pop_front()
     853           12 :                 .expect("read_at called but we have no expectations left");
     854           12 :             assert_eq!(exp.offset, offset);
     855           12 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     856           12 :             match exp.result {
     857           12 :                 Ok(bytes) => {
     858           12 :                     assert!(bytes.len() <= buf.bytes_total());
     859           12 :                     buf.put_slice(&bytes);
     860           12 :                     (buf, Ok(bytes.len()))
     861              :                 }
     862            0 :                 Err(e) => (buf, Err(e)),
     863              :             }
     864           12 :         }
     865              :     }
     866              : 
     867              :     impl Drop for MockReadAt {
     868            8 :         fn drop(&mut self) {
     869            8 :             assert_eq!(self.expectations.len(), 0);
     870            8 :         }
     871              :     }
     872              : 
     873              :     #[tokio::test]
     874            2 :     async fn test_basic() {
     875            2 :         let buf = Vec::with_capacity(5).slice_full();
     876            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     877            2 :             expectations: VecDeque::from(vec![Expectation {
     878            2 :                 offset: 0,
     879            2 :                 bytes_total: 5,
     880            2 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     881            2 :             }]),
     882            2 :         }));
     883            2 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     884            2 :             let mock_read_at = Arc::clone(&mock_read_at);
     885            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     886            2 :         })
     887            2 :         .await;
     888            2 :         assert!(res.is_ok());
     889            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
     890            2 :     }
     891              : 
     892              :     #[tokio::test]
     893            2 :     async fn test_empty_buf_issues_no_syscall() {
     894            2 :         let buf = Vec::new().slice_full();
     895            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     896            2 :             expectations: VecDeque::new(),
     897            2 :         }));
     898            2 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     899            0 :             let mock_read_at = Arc::clone(&mock_read_at);
     900            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     901            2 :         })
     902            2 :         .await;
     903            2 :         assert!(res.is_ok());
     904            2 :     }
     905              : 
     906              :     #[tokio::test]
     907            2 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
     908            2 :         let buf = Vec::with_capacity(4).slice_full();
     909            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     910            2 :             expectations: VecDeque::from(vec![
     911            2 :                 Expectation {
     912            2 :                     offset: 0,
     913            2 :                     bytes_total: 4,
     914            2 :                     result: Ok(vec![b'a', b'b']),
     915            2 :                 },
     916            2 :                 Expectation {
     917            2 :                     offset: 2,
     918            2 :                     bytes_total: 2,
     919            2 :                     result: Ok(vec![b'c', b'd']),
     920            2 :                 },
     921            2 :             ]),
     922            2 :         }));
     923            4 :         let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     924            4 :             let mock_read_at = Arc::clone(&mock_read_at);
     925            4 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     926            4 :         })
     927            2 :         .await;
     928            2 :         assert!(res.is_ok());
     929            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
     930            2 :     }
     931              : 
     932              :     #[tokio::test]
     933            2 :     async fn test_eof_before_buffer_full() {
     934            2 :         let buf = Vec::with_capacity(3).slice_full();
     935            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     936            2 :             expectations: VecDeque::from(vec![
     937            2 :                 Expectation {
     938            2 :                     offset: 0,
     939            2 :                     bytes_total: 3,
     940            2 :                     result: Ok(vec![b'a']),
     941            2 :                 },
     942            2 :                 Expectation {
     943            2 :                     offset: 1,
     944            2 :                     bytes_total: 2,
     945            2 :                     result: Ok(vec![b'b']),
     946            2 :                 },
     947            2 :                 Expectation {
     948            2 :                     offset: 2,
     949            2 :                     bytes_total: 1,
     950            2 :                     result: Ok(vec![]),
     951            2 :                 },
     952            2 :             ]),
     953            2 :         }));
     954            6 :         let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
     955            6 :             let mock_read_at = Arc::clone(&mock_read_at);
     956            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     957            6 :         })
     958            2 :         .await;
     959            2 :         let Err(err) = res else {
     960            2 :             panic!("should return an error");
     961            2 :         };
     962            2 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
     963            2 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
     964            2 :         // buffer contents on error are unspecified
     965            2 :     }
     966              : }
     967              : 
     968              : struct FileGuard {
     969              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
     970              : }
     971              : 
     972              : impl AsRef<OwnedFd> for FileGuard {
     973      1532783 :     fn as_ref(&self) -> &OwnedFd {
     974      1532783 :         // This unwrap is safe because we only create `FileGuard`s
     975      1532783 :         // if we know that the file is Some.
     976      1532783 :         self.slot_guard.file.as_ref().unwrap()
     977      1532783 :     }
     978              : }
     979              : 
     980              : impl FileGuard {
     981              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     982       766530 :     fn with_std_file<F, R>(&self, with: F) -> R
     983       766530 :     where
     984       766530 :         F: FnOnce(&File) -> R,
     985       766530 :     {
     986       766530 :         // SAFETY:
     987       766530 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
     988       766530 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
     989       766530 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
     990       766530 :         let res = with(&file);
     991       766530 :         let _ = file.into_raw_fd();
     992       766530 :         res
     993       766530 :     }
     994              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     995            4 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
     996            4 :     where
     997            4 :         F: FnOnce(&mut File) -> R,
     998            4 :     {
     999            4 :         // SAFETY:
    1000            4 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1001            4 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
    1002            4 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1003            4 :         let res = with(&mut file);
    1004            4 :         let _ = file.into_raw_fd();
    1005            4 :         res
    1006            4 :     }
    1007              : }
    1008              : 
    1009              : impl tokio_epoll_uring::IoFd for FileGuard {
    1010       766249 :     unsafe fn as_fd(&self) -> RawFd {
    1011       766249 :         let owned_fd: &OwnedFd = self.as_ref();
    1012       766249 :         owned_fd.as_raw_fd()
    1013       766249 :     }
    1014              : }
    1015              : 
    1016              : #[cfg(test)]
    1017              : impl VirtualFile {
    1018        20900 :     pub(crate) async fn read_blk(
    1019        20900 :         &self,
    1020        20900 :         blknum: u32,
    1021        20900 :         ctx: &RequestContext,
    1022        20900 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1023        20900 :         use crate::page_cache::PAGE_SZ;
    1024        20900 :         let slice = Vec::with_capacity(PAGE_SZ).slice_full();
    1025        20900 :         assert_eq!(slice.bytes_total(), PAGE_SZ);
    1026        20900 :         let slice = self
    1027        20900 :             .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
    1028        10611 :             .await?;
    1029        20900 :         Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
    1030        20900 :     }
    1031              : 
    1032          224 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
    1033          224 :         let mut tmp = vec![0; 128];
    1034              :         loop {
    1035          444 :             let slice = tmp.slice(..128);
    1036          444 :             let (slice, res) = self.read_at(slice, self.pos, ctx).await;
    1037            2 :             match res {
    1038          222 :                 Ok(0) => return Ok(()),
    1039          220 :                 Ok(n) => {
    1040          220 :                     self.pos += n as u64;
    1041          220 :                     buf.extend_from_slice(&slice[..n]);
    1042          220 :                 }
    1043            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1044            2 :                 Err(e) => return Err(e),
    1045              :             }
    1046          220 :             tmp = slice.into_inner();
    1047              :         }
    1048          224 :     }
    1049              : }
    1050              : 
    1051              : impl Drop for VirtualFile {
    1052              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1053         4768 :     fn drop(&mut self) {
    1054         4768 :         let handle = self.handle.get_mut();
    1055         4768 : 
    1056         4768 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1057         4768 :             if slot_guard.tag == tag {
    1058         4768 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1059         4768 :                 // there is also operation "close-by-replace" for closes done on eviction for
    1060         4768 :                 // comparison.
    1061         4768 :                 if let Some(fd) = slot_guard.file.take() {
    1062         4180 :                     STORAGE_IO_TIME_METRIC
    1063         4180 :                         .get(StorageIoOperation::Close)
    1064         4180 :                         .observe_closure_duration(|| drop(fd));
    1065         4180 :                 }
    1066         4768 :             }
    1067         4768 :         }
    1068         4768 : 
    1069         4768 :         // We don't have async drop so we cannot directly await the lock here.
    1070         4768 :         // Instead, first do a best-effort attempt at closing the underlying
    1071         4768 :         // file descriptor by using `try_write`, and if that fails, spawn
    1072         4768 :         // a tokio task to do it asynchronously: we just want it to be
    1073         4768 :         // cleaned up eventually.
    1074         4768 :         // Most of the time, the `try_lock` should succeed though,
    1075         4768 :         // as we have `&mut self` access. In other words, if the slot
    1076         4768 :         // is still occupied by our file, there should be no access from
    1077         4768 :         // other I/O operations; the only other possible place to lock
    1078         4768 :         // the slot is the lock algorithm looking for free slots.
    1079         4768 :         let slot = &get_open_files().slots[handle.index];
    1080         4768 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1081         4768 :             clean_slot(slot, slot_guard, handle.tag);
    1082         4768 :         } else {
    1083            0 :             let tag = handle.tag;
    1084            0 :             tokio::spawn(async move {
    1085            0 :                 let slot_guard = slot.inner.write().await;
    1086            0 :                 clean_slot(slot, slot_guard, tag);
    1087            0 :             });
    1088            0 :         };
    1089         4768 :     }
    1090              : }
    1091              : 
    1092              : impl OwnedAsyncWriter for VirtualFile {
    1093              :     #[inline(always)]
    1094            3 :     async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1095            3 :         &mut self,
    1096            3 :         buf: B,
    1097            3 :         ctx: &RequestContext,
    1098            3 :     ) -> std::io::Result<(usize, B::Buf)> {
    1099            3 :         let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
    1100            3 :         res.map(move |v| (v, buf))
    1101            3 :     }
    1102              : }
    1103              : 
    1104              : impl OpenFiles {
    1105          162 :     fn new(num_slots: usize) -> OpenFiles {
    1106          162 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1107         1620 :         for _ in 0..num_slots {
    1108         1620 :             let slot = Slot {
    1109         1620 :                 recently_used: AtomicBool::new(false),
    1110         1620 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1111         1620 :             };
    1112         1620 :             slots.push(slot);
    1113         1620 :         }
    1114              : 
    1115          162 :         OpenFiles {
    1116          162 :             next: AtomicUsize::new(0),
    1117          162 :             slots: Box::leak(slots),
    1118          162 :         }
    1119          162 :     }
    1120              : }
    1121              : 
    1122              : ///
    1123              : /// Initialize the virtual file module. This must be called once at page
    1124              : /// server startup.
    1125              : ///
    1126              : #[cfg(not(test))]
    1127            0 : pub fn init(num_slots: usize, engine: IoEngineKind) {
    1128            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1129            0 :         panic!("virtual_file::init called twice");
    1130            0 :     }
    1131            0 :     io_engine::init(engine);
    1132            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1133            0 : }
    1134              : 
    1135              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1136              : 
    1137              : // Get a handle to the global slots array.
    1138      1543092 : fn get_open_files() -> &'static OpenFiles {
    1139      1543092 :     //
    1140      1543092 :     // In unit tests, page server startup doesn't happen and no one calls
    1141      1543092 :     // virtual_file::init(). Initialize it here, with a small array.
    1142      1543092 :     //
    1143      1543092 :     // This applies to the virtual file tests below, but all other unit
    1144      1543092 :     // tests too, so the virtual file facility is always usable in
    1145      1543092 :     // unit tests.
    1146      1543092 :     //
    1147      1543092 :     if cfg!(test) {
    1148      1543092 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1149              :     } else {
    1150            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1151              :     }
    1152      1543092 : }
    1153              : 
    1154              : #[cfg(test)]
    1155              : mod tests {
    1156              :     use crate::context::DownloadBehavior;
    1157              :     use crate::task_mgr::TaskKind;
    1158              : 
    1159              :     use super::*;
    1160              :     use owned_buffers_io::slice::SliceExt;
    1161              :     use rand::seq::SliceRandom;
    1162              :     use rand::thread_rng;
    1163              :     use rand::Rng;
    1164              :     use std::io::Write;
    1165              :     use std::os::unix::fs::FileExt;
    1166              :     use std::sync::Arc;
    1167              : 
    1168              :     enum MaybeVirtualFile {
    1169              :         VirtualFile(VirtualFile),
    1170              :         File(File),
    1171              :     }
    1172              : 
    1173              :     impl From<VirtualFile> for MaybeVirtualFile {
    1174            6 :         fn from(vf: VirtualFile) -> Self {
    1175            6 :             MaybeVirtualFile::VirtualFile(vf)
    1176            6 :         }
    1177              :     }
    1178              : 
    1179              :     impl MaybeVirtualFile {
    1180          404 :         async fn read_exact_at(
    1181          404 :             &self,
    1182          404 :             mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
    1183          404 :             offset: u64,
    1184          404 :             ctx: &RequestContext,
    1185          404 :         ) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
    1186          404 :             match self {
    1187          202 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
    1188          202 :                 MaybeVirtualFile::File(file) => {
    1189          202 :                     let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
    1190          202 :                     file.read_exact_at(rust_slice, offset).map(|()| slice)
    1191              :                 }
    1192              :             }
    1193          404 :         }
    1194            8 :         async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1195            8 :             &self,
    1196            8 :             buf: B,
    1197            8 :             offset: u64,
    1198            8 :             ctx: &RequestContext,
    1199            8 :         ) -> Result<(), Error> {
    1200            8 :             match self {
    1201            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1202            4 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1203            4 :                     res
    1204              :                 }
    1205            4 :                 MaybeVirtualFile::File(file) => {
    1206            4 :                     let buf_len = buf.bytes_init();
    1207            4 :                     if buf_len == 0 {
    1208            0 :                         return Ok(());
    1209            4 :                     }
    1210            4 :                     file.write_all_at(&buf.slice(0..buf_len), offset)
    1211              :                 }
    1212              :             }
    1213            8 :         }
    1214           36 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1215           36 :             match self {
    1216           18 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1217           18 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1218              :             }
    1219           36 :         }
    1220            8 :         async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1221            8 :             &mut self,
    1222            8 :             buf: B,
    1223            8 :             ctx: &RequestContext,
    1224            8 :         ) -> Result<(), Error> {
    1225            8 :             match self {
    1226            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1227            4 :                     let (_buf, res) = file.write_all(buf, ctx).await;
    1228            4 :                     res.map(|_| ())
    1229              :                 }
    1230            4 :                 MaybeVirtualFile::File(file) => {
    1231            4 :                     let buf_len = buf.bytes_init();
    1232            4 :                     if buf_len == 0 {
    1233            0 :                         return Ok(());
    1234            4 :                     }
    1235            4 :                     file.write_all(&buf.slice(0..buf_len))
    1236              :                 }
    1237              :             }
    1238            8 :         }
    1239              : 
    1240              :         // Helper function to slurp contents of a file, starting at the current position,
    1241              :         // into a string
    1242          442 :         async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
    1243          442 :             use std::io::Read;
    1244          442 :             let mut buf = String::new();
    1245          442 :             match self {
    1246          224 :                 MaybeVirtualFile::VirtualFile(file) => {
    1247          224 :                     let mut buf = Vec::new();
    1248          226 :                     file.read_to_end(&mut buf, ctx).await?;
    1249          222 :                     return Ok(String::from_utf8(buf).unwrap());
    1250              :                 }
    1251          218 :                 MaybeVirtualFile::File(file) => {
    1252          218 :                     file.read_to_string(&mut buf)?;
    1253              :                 }
    1254              :             }
    1255          216 :             Ok(buf)
    1256          442 :         }
    1257              : 
    1258              :         // Helper function to slurp a portion of a file into a string
    1259          404 :         async fn read_string_at(
    1260          404 :             &mut self,
    1261          404 :             pos: u64,
    1262          404 :             len: usize,
    1263          404 :             ctx: &RequestContext,
    1264          404 :         ) -> Result<String, Error> {
    1265          404 :             let slice = Vec::with_capacity(len).slice_full();
    1266          404 :             assert_eq!(slice.bytes_total(), len);
    1267          404 :             let slice = self.read_exact_at(slice, pos, ctx).await?;
    1268          404 :             let vec = slice.into_inner();
    1269          404 :             assert_eq!(vec.len(), len);
    1270          404 :             Ok(String::from_utf8(vec).unwrap())
    1271          404 :         }
    1272              :     }
    1273              : 
    1274              :     #[tokio::test]
    1275            2 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1276            2 :         // The real work is done in the test_files() helper function. This
    1277            2 :         // allows us to run the same set of tests against a native File, and
    1278            2 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1279            2 :         // but this allows us to verify that the operations return the same
    1280            2 :         // results with VirtualFiles as with native Files. (Except that with
    1281            2 :         // native files, you will run out of file descriptors if the ulimit
    1282            2 :         // is low enough.)
    1283            2 :         struct A;
    1284            2 : 
    1285            2 :         impl Adapter for A {
    1286          206 :             async fn open(
    1287          206 :                 path: Utf8PathBuf,
    1288          206 :                 opts: OpenOptions,
    1289          206 :                 ctx: &RequestContext,
    1290          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1291          206 :                 let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
    1292          206 :                 Ok(MaybeVirtualFile::VirtualFile(vf))
    1293          206 :             }
    1294            2 :         }
    1295          530 :         test_files::<A>("virtual_files").await
    1296            2 :     }
    1297              : 
    1298              :     #[tokio::test]
    1299            2 :     async fn test_physical_files() -> anyhow::Result<()> {
    1300            2 :         struct B;
    1301            2 : 
    1302            2 :         impl Adapter for B {
    1303          206 :             async fn open(
    1304          206 :                 path: Utf8PathBuf,
    1305          206 :                 opts: OpenOptions,
    1306          206 :                 _ctx: &RequestContext,
    1307          206 :             ) -> Result<MaybeVirtualFile, anyhow::Error> {
    1308          206 :                 Ok(MaybeVirtualFile::File({
    1309          206 :                     let owned_fd = opts.open(path.as_std_path()).await?;
    1310          206 :                     File::from(owned_fd)
    1311            2 :                 }))
    1312          206 :             }
    1313            2 :         }
    1314            2 : 
    1315          104 :         test_files::<B>("physical_files").await
    1316            2 :     }
    1317              : 
    1318              :     /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
    1319              :     /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
    1320              :     /// in trait which benefits from the new lifetime capture rules already.
    1321              :     trait Adapter {
    1322              :         async fn open(
    1323              :             path: Utf8PathBuf,
    1324              :             opts: OpenOptions,
    1325              :             ctx: &RequestContext,
    1326              :         ) -> Result<MaybeVirtualFile, anyhow::Error>;
    1327              :     }
    1328              : 
    1329            4 :     async fn test_files<A>(testname: &str) -> anyhow::Result<()>
    1330            4 :     where
    1331            4 :         A: Adapter,
    1332            4 :     {
    1333            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1334            4 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1335            4 :         std::fs::create_dir_all(&testdir)?;
    1336              : 
    1337            4 :         let path_a = testdir.join("file_a");
    1338            4 :         let mut file_a = A::open(
    1339            4 :             path_a.clone(),
    1340            4 :             OpenOptions::new()
    1341            4 :                 .write(true)
    1342            4 :                 .create(true)
    1343            4 :                 .truncate(true)
    1344            4 :                 .to_owned(),
    1345            4 :             &ctx,
    1346            4 :         )
    1347            4 :         .await?;
    1348            4 :         file_a.write_all(b"foobar".to_vec(), &ctx).await?;
    1349              : 
    1350              :         // cannot read from a file opened in write-only mode
    1351            4 :         let _ = file_a.read_string(&ctx).await.unwrap_err();
    1352              : 
    1353              :         // Close the file and re-open for reading
    1354            4 :         let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
    1355              : 
    1356              :         // cannot write to a file opened in read-only mode
    1357            4 :         let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err();
    1358            4 : 
    1359            4 :         // Try simple read
    1360            4 :         assert_eq!("foobar", file_a.read_string(&ctx).await?);
    1361              : 
    1362              :         // It's positioned at the EOF now.
    1363            4 :         assert_eq!("", file_a.read_string(&ctx).await?);
    1364              : 
    1365              :         // Test seeks.
    1366            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1367            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1368              : 
    1369            4 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1370            4 :         assert_eq!("ar", file_a.read_string(&ctx).await?);
    1371              : 
    1372            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1373            4 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1374            4 :         assert_eq!("bar", file_a.read_string(&ctx).await?);
    1375              : 
    1376            4 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1377            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1378              : 
    1379              :         // Test erroneous seeks to before byte 0
    1380            4 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1381            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1382            4 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1383            4 : 
    1384            4 :         // the erroneous seek should have left the position unchanged
    1385            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1386              : 
    1387              :         // Create another test file, and try FileExt functions on it.
    1388            4 :         let path_b = testdir.join("file_b");
    1389            4 :         let mut file_b = A::open(
    1390            4 :             path_b.clone(),
    1391            4 :             OpenOptions::new()
    1392            4 :                 .read(true)
    1393            4 :                 .write(true)
    1394            4 :                 .create(true)
    1395            4 :                 .truncate(true)
    1396            4 :                 .to_owned(),
    1397            4 :             &ctx,
    1398            4 :         )
    1399            2 :         .await?;
    1400            4 :         file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?;
    1401            4 :         file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?;
    1402              : 
    1403            4 :         assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
    1404              : 
    1405              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1406              :         // open the same file many times. The effect is the same.)
    1407              :         //
    1408              :         // leave file_a positioned at offset 1 before we start
    1409            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1410              : 
    1411            4 :         let mut vfiles = Vec::new();
    1412          404 :         for _ in 0..100 {
    1413          400 :             let mut vfile = A::open(
    1414          400 :                 path_b.clone(),
    1415          400 :                 OpenOptions::new().read(true).to_owned(),
    1416          400 :                 &ctx,
    1417          400 :             )
    1418          200 :             .await?;
    1419          400 :             assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
    1420          400 :             vfiles.push(vfile);
    1421              :         }
    1422              : 
    1423              :         // make sure we opened enough files to definitely cause evictions.
    1424            4 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1425              : 
    1426              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1427              :         // from it again. We left the file positioned at offset 1 above.
    1428            4 :         assert_eq!("oobar", file_a.read_string(&ctx).await?);
    1429              : 
    1430              :         // Check that all the other FDs still work too. Use them in random order for
    1431              :         // good measure.
    1432            4 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1433          400 :         for vfile in vfiles.iter_mut() {
    1434          400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
    1435              :         }
    1436              : 
    1437            4 :         Ok(())
    1438            4 :     }
    1439              : 
    1440              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1441              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1442              :     /// VirtualFile from multiple threads concurrently.
    1443              :     #[tokio::test]
    1444            2 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1445            2 :         const SIZE: usize = 8 * 1024;
    1446            2 :         const VIRTUAL_FILES: usize = 100;
    1447            2 :         const THREADS: usize = 100;
    1448            2 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1449            2 : 
    1450            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1451            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1452            2 :         std::fs::create_dir_all(&testdir)?;
    1453            2 : 
    1454            2 :         // Create a test file.
    1455            2 :         let test_file_path = testdir.join("concurrency_test_file");
    1456            2 :         {
    1457            2 :             let file = File::create(&test_file_path)?;
    1458            2 :             file.write_all_at(&SAMPLE, 0)?;
    1459            2 :         }
    1460            2 : 
    1461            2 :         // Open the file many times.
    1462            2 :         let mut files = Vec::new();
    1463          202 :         for _ in 0..VIRTUAL_FILES {
    1464          200 :             let f = VirtualFile::open_with_options(
    1465          200 :                 &test_file_path,
    1466          200 :                 OpenOptions::new().read(true),
    1467          200 :                 &ctx,
    1468          200 :             )
    1469          101 :             .await?;
    1470          200 :             files.push(f);
    1471            2 :         }
    1472            2 :         let files = Arc::new(files);
    1473            2 : 
    1474            2 :         // Launch many threads, and use the virtual files concurrently in random order.
    1475            2 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1476            2 :             .worker_threads(THREADS)
    1477            2 :             .thread_name("test_vfile_concurrency thread")
    1478            2 :             .build()
    1479            2 :             .unwrap();
    1480            2 :         let mut hdls = Vec::new();
    1481          202 :         for _threadno in 0..THREADS {
    1482          200 :             let files = files.clone();
    1483          200 :             let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
    1484          200 :             let hdl = rt.spawn(async move {
    1485          200 :                 let mut buf = vec![0u8; SIZE];
    1486          200 :                 let mut rng = rand::rngs::OsRng;
    1487       200000 :                 for _ in 1..1000 {
    1488       199800 :                     let f = &files[rng.gen_range(0..files.len())];
    1489       199800 :                     buf = f
    1490       199800 :                         .read_exact_at(buf.slice_full(), 0, &ctx)
    1491       632320 :                         .await
    1492       199800 :                         .unwrap()
    1493       199800 :                         .into_inner();
    1494       199800 :                     assert!(buf == SAMPLE);
    1495            2 :                 }
    1496          200 :             });
    1497          200 :             hdls.push(hdl);
    1498          200 :         }
    1499          202 :         for hdl in hdls {
    1500          200 :             hdl.await?;
    1501            2 :         }
    1502            2 :         std::mem::forget(rt);
    1503            2 : 
    1504            2 :         Ok(())
    1505            2 :     }
    1506              : 
    1507              :     #[tokio::test]
    1508            2 :     async fn test_atomic_overwrite_basic() {
    1509            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1510            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1511            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1512            2 : 
    1513            2 :         let path = testdir.join("myfile");
    1514            2 :         let tmp_path = testdir.join("myfile.tmp");
    1515            2 : 
    1516            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1517            2 :             .await
    1518            2 :             .unwrap();
    1519            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1520            2 :         let post = file.read_string(&ctx).await.unwrap();
    1521            2 :         assert_eq!(post, "foo");
    1522            2 :         assert!(!tmp_path.exists());
    1523            2 :         drop(file);
    1524            2 : 
    1525            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1526            2 :             .await
    1527            2 :             .unwrap();
    1528            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1529            2 :         let post = file.read_string(&ctx).await.unwrap();
    1530            2 :         assert_eq!(post, "bar");
    1531            2 :         assert!(!tmp_path.exists());
    1532            2 :         drop(file);
    1533            2 :     }
    1534              : 
    1535              :     #[tokio::test]
    1536            2 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1537            2 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1538            2 :         let testdir =
    1539            2 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1540            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1541            2 : 
    1542            2 :         let path = testdir.join("myfile");
    1543            2 :         let tmp_path = testdir.join("myfile.tmp");
    1544            2 : 
    1545            2 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1546            2 :         assert!(tmp_path.exists());
    1547            2 : 
    1548            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1549            2 :             .await
    1550            2 :             .unwrap();
    1551            2 : 
    1552            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
    1553            2 :         let post = file.read_string(&ctx).await.unwrap();
    1554            2 :         assert_eq!(post, "foo");
    1555            2 :         assert!(!tmp_path.exists());
    1556            2 :         drop(file);
    1557            2 :     }
    1558              : }
        

Generated by: LCOV version 2.1-beta