LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 92.5 % 929 859
Test Date: 2024-05-10 13:18:37 Functions: 90.5 % 201 182

            Line data    Source code
       1              : //!
       2              : //! VirtualFile is like a normal File, but it's not bound directly to
       3              : //! a file descriptor. Instead, the file is opened when it's read from,
       4              : //! and if too many files are open globally in the system, least-recently
       5              : //! used ones are closed.
       6              : //!
       7              : //! To track which files have been recently used, we use the clock algorithm
       8              : //! with a 'recently_used' flag on each slot.
       9              : //!
      10              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11              : //! src/backend/storage/file/fd.c
      12              : //!
      13              : use crate::context::RequestContext;
      14              : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      15              : 
      16              : use crate::page_cache::PageWriteGuard;
      17              : use crate::tenant::TENANTS_SEGMENT_NAME;
      18              : use camino::{Utf8Path, Utf8PathBuf};
      19              : use once_cell::sync::OnceCell;
      20              : use pageserver_api::shard::TenantShardId;
      21              : use std::fs::File;
      22              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      23              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      24              : 
      25              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      26              : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      27              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      28              : use tokio::time::Instant;
      29              : 
      30              : pub use pageserver_api::models::virtual_file as api;
      31              : pub(crate) mod io_engine;
      32              : pub use io_engine::feature_test as io_engine_feature_test;
      33              : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
      34              : mod metadata;
      35              : mod open_options;
      36              : use self::owned_buffers_io::write::OwnedAsyncWriter;
      37              : pub(crate) use io_engine::IoEngineKind;
      38              : pub(crate) use metadata::Metadata;
      39              : pub(crate) use open_options::*;
      40              : 
      41              : pub(crate) mod owned_buffers_io {
      42              :     //! Abstractions for IO with owned buffers.
      43              :     //!
      44              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      45              :     //! reason we need this abstraction.
      46              :     //!
      47              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      48              :     //! but for the time being we're proving out the primitives in the neon.git repo
      49              :     //! for faster iteration.
      50              : 
      51              :     pub(crate) mod write;
      52              :     pub(crate) mod util {
      53              :         pub(crate) mod size_tracking_writer;
      54              :     }
      55              : }
      56              : 
      57              : ///
      58              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      59              : /// the underlying file is closed if the system is low on file descriptors,
      60              : /// and re-opened when it's accessed again.
      61              : ///
      62              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      63              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      64              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      65              : /// require a mutable reference, because they modify the "current position".
      66              : ///
      67              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      68              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      69              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      70              : /// 'tag' field is used to detect whether the handle still is valid or not.
      71              : ///
      72              : #[derive(Debug)]
      73              : pub struct VirtualFile {
      74              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      75              :     /// might contain our File, or it may be empty, or it may contain a File that
      76              :     /// belongs to a different VirtualFile.
      77              :     handle: RwLock<SlotHandle>,
      78              : 
      79              :     /// Current file position
      80              :     pos: u64,
      81              : 
      82              :     /// File path and options to use to open it.
      83              :     ///
      84              :     /// Note: this only contains the options needed to re-open it. For example,
      85              :     /// if a new file is created, we only pass the create flag when it's initially
      86              :     /// opened, in the VirtualFile::create() function, and strip the flag before
      87              :     /// storing it here.
      88              :     pub path: Utf8PathBuf,
      89              :     open_options: OpenOptions,
      90              : 
      91              :     // These are strings becase we only use them for metrics, and those expect strings.
      92              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      93              :     // strings.
      94              :     tenant_id: String,
      95              :     shard_id: String,
      96              :     timeline_id: String,
      97              : }
      98              : 
      99              : #[derive(Debug, PartialEq, Clone, Copy)]
     100              : struct SlotHandle {
     101              :     /// Index into OPEN_FILES.slots
     102              :     index: usize,
     103              : 
     104              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     105              :     /// been recycled and no longer contains the FD for this virtual file.
     106              :     tag: u64,
     107              : }
     108              : 
     109              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     110              : /// are currently open. Each slot in the array is protected by a separate lock,
     111              : /// so that different files can be accessed independently. The lock must be held
     112              : /// in write mode to replace the slot with a different file, but a read mode
     113              : /// is enough to operate on the file, whether you're reading or writing to it.
     114              : ///
     115              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     116              : /// the virtual_file::init() function. It must be called exactly once at page
     117              : /// server startup.
     118              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     119              : 
     120              : struct OpenFiles {
     121              :     slots: &'static [Slot],
     122              : 
     123              :     /// clock arm for the clock algorithm
     124              :     next: AtomicUsize,
     125              : }
     126              : 
     127              : struct Slot {
     128              :     inner: RwLock<SlotInner>,
     129              : 
     130              :     /// has this file been used since last clock sweep?
     131              :     recently_used: AtomicBool,
     132              : }
     133              : 
     134              : struct SlotInner {
     135              :     /// Counter that's incremented every time a different file is stored here.
     136              :     /// To avoid the ABA problem.
     137              :     tag: u64,
     138              : 
     139              :     /// the underlying file
     140              :     file: Option<OwnedFd>,
     141              : }
     142              : 
     143              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     144              : struct PageWriteGuardBuf {
     145              :     page: PageWriteGuard<'static>,
     146              :     init_up_to: usize,
     147              : }
     148              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     149              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     150              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     151       111465 :     fn stable_ptr(&self) -> *const u8 {
     152       111465 :         self.page.as_ptr()
     153       111465 :     }
     154       334395 :     fn bytes_init(&self) -> usize {
     155       334395 :         self.init_up_to
     156       334395 :     }
     157       111465 :     fn bytes_total(&self) -> usize {
     158       111465 :         self.page.len()
     159       111465 :     }
     160              : }
     161              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     162              : // hence it's safe to hand out the `stable_mut_ptr()`.
     163              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     164       111465 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     165       111465 :         self.page.as_mut_ptr()
     166       111465 :     }
     167              : 
     168       111465 :     unsafe fn set_init(&mut self, pos: usize) {
     169       111465 :         assert!(pos <= self.page.len());
     170       111465 :         self.init_up_to = pos;
     171       111465 :     }
     172              : }
     173              : 
     174              : impl OpenFiles {
     175              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     176              :     ///
     177              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     178              :     /// recently_used has been set. It's all ready for reuse.
     179       195433 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     180       195433 :         //
     181       195433 :         // Run the clock algorithm to find a slot to replace.
     182       195433 :         //
     183       195433 :         let num_slots = self.slots.len();
     184       195433 :         let mut retries = 0;
     185              :         let mut slot;
     186              :         let mut slot_guard;
     187              :         let index;
     188      2697259 :         loop {
     189      2697259 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     190      2697259 :             slot = &self.slots[next];
     191      2697259 : 
     192      2697259 :             // If the recently_used flag on this slot is set, continue the clock
     193      2697259 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     194      2697259 :             // lock, also continue the clock sweep.
     195      2697259 :             //
     196      2697259 :             // We only continue in this manner for a while, though. If we loop
     197      2697259 :             // through the array twice without finding a victim, just pick the
     198      2697259 :             // next slot and wait until we can reuse it. This way, we avoid
     199      2697259 :             // spinning in the extreme case that all the slots are busy with an
     200      2697259 :             // I/O operation.
     201      2697259 :             if retries < num_slots * 2 {
     202      2591567 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     203      2382628 :                     if let Ok(guard) = slot.inner.try_write() {
     204        89741 :                         slot_guard = guard;
     205        89741 :                         index = next;
     206        89741 :                         break;
     207      2292887 :                     }
     208       208939 :                 }
     209      2501826 :                 retries += 1;
     210              :             } else {
     211       105692 :                 slot_guard = slot.inner.write().await;
     212       105692 :                 index = next;
     213       105692 :                 break;
     214              :             }
     215              :         }
     216              : 
     217              :         //
     218              :         // We now have the victim slot locked. If it was in use previously, close the
     219              :         // old file.
     220              :         //
     221       195433 :         if let Some(old_file) = slot_guard.file.take() {
     222       191952 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     223       191952 :             // distinguish the two.
     224       191952 :             STORAGE_IO_TIME_METRIC
     225       191952 :                 .get(StorageIoOperation::CloseByReplace)
     226       191952 :                 .observe_closure_duration(|| drop(old_file));
     227       191952 :         }
     228              : 
     229              :         // Prepare the slot for reuse and return it
     230       195433 :         slot_guard.tag += 1;
     231       195433 :         slot.recently_used.store(true, Ordering::Relaxed);
     232       195433 :         (
     233       195433 :             SlotHandle {
     234       195433 :                 index,
     235       195433 :                 tag: slot_guard.tag,
     236       195433 :             },
     237       195433 :             slot_guard,
     238       195433 :         )
     239       195433 :     }
     240              : }
     241              : 
     242              : /// Identify error types that should alwways terminate the process.  Other
     243              : /// error types may be elegible for retry.
     244            0 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     245            0 :     use nix::errno::Errno::*;
     246            0 :     match e.raw_os_error().map(nix::errno::from_i32) {
     247              :         Some(EIO) => {
     248              :             // Terminate on EIO because we no longer trust the device to store
     249              :             // data safely, or to uphold persistence guarantees on fsync.
     250            0 :             true
     251              :         }
     252              :         Some(EROFS) => {
     253              :             // Terminate on EROFS because a filesystem is usually remounted
     254              :             // readonly when it has experienced some critical issue, so the same
     255              :             // logic as EIO applies.
     256            0 :             true
     257              :         }
     258              :         Some(EACCES) => {
     259              :             // Terminate on EACCESS because we should always have permissions
     260              :             // for our own data dir: if we don't, then we can't do our job and
     261              :             // need administrative intervention to fix permissions.  Terminating
     262              :             // is the best way to make sure we stop cleanly rather than going
     263              :             // into infinite retry loops, and will make it clear to the outside
     264              :             // world that we need help.
     265            0 :             true
     266              :         }
     267              :         _ => {
     268              :             // Treat all other local file I/O errors are retryable.  This includes:
     269              :             // - ENOSPC: we stay up and wait for eviction to free some space
     270              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     271              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     272            0 :             false
     273              :         }
     274              :     }
     275            0 : }
     276              : 
     277              : /// Call this when the local filesystem gives us an error with an external
     278              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     279              : /// bad storage or bad configuration, and we can't fix that from inside
     280              : /// a running process.
     281            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     282            0 :     tracing::error!("Fatal I/O error: {e}: {context})");
     283            0 :     std::process::abort();
     284              : }
     285              : 
     286              : pub(crate) trait MaybeFatalIo<T> {
     287              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     288              :     fn fatal_err(self, context: &str) -> T;
     289              : }
     290              : 
     291              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     292              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     293              :     ///
     294              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     295              :     /// not on ENOSPC.
     296           22 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     297           22 :         if let Err(e) = &self {
     298            0 :             if is_fatal_io_error(e) {
     299            0 :                 on_fatal_io_error(e, context);
     300            0 :             }
     301           22 :         }
     302           22 :         self
     303           22 :     }
     304              : 
     305              :     /// Terminate the process on any I/O error.
     306              :     ///
     307              :     /// This is appropriate for reads on files that we know exist: they should always work.
     308         1950 :     fn fatal_err(self, context: &str) -> T {
     309         1950 :         match self {
     310         1950 :             Ok(v) => v,
     311            0 :             Err(e) => {
     312            0 :                 on_fatal_io_error(&e, context);
     313              :             }
     314              :         }
     315         1950 :     }
     316              : }
     317              : 
     318              : /// Observe duration for the given storage I/O operation
     319              : ///
     320              : /// Unlike `observe_closure_duration`, this supports async,
     321              : /// where "support" means that we measure wall clock time.
     322              : macro_rules! observe_duration {
     323              :     ($op:expr, $($body:tt)*) => {{
     324              :         let instant = Instant::now();
     325              :         let result = $($body)*;
     326              :         let elapsed = instant.elapsed().as_secs_f64();
     327              :         STORAGE_IO_TIME_METRIC
     328              :             .get($op)
     329              :             .observe(elapsed);
     330              :         result
     331              :     }}
     332              : }
     333              : 
     334              : macro_rules! with_file {
     335              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     336              :         let $ident = $this.lock_file().await?;
     337              :         observe_duration!($op, $($body)*)
     338              :     }};
     339              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     340              :         let mut $ident = $this.lock_file().await?;
     341              :         observe_duration!($op, $($body)*)
     342              :     }};
     343              : }
     344              : 
     345              : impl VirtualFile {
     346              :     /// Open a file in read-only mode. Like File::open.
     347         1832 :     pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
     348         1832 :         Self::open_with_options(path, OpenOptions::new().read(true)).await
     349         1832 :     }
     350              : 
     351              :     /// Create a new file for writing. If the file exists, it will be truncated.
     352              :     /// Like File::create.
     353         1089 :     pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
     354         1089 :         Self::open_with_options(
     355         1089 :             path,
     356         1089 :             OpenOptions::new().write(true).create(true).truncate(true),
     357         1089 :         )
     358          563 :         .await
     359         1089 :     }
     360              : 
     361              :     /// Open a file with given options.
     362              :     ///
     363              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     364              :     /// they will be applied also when the file is subsequently re-opened, not only
     365              :     /// on the first time. Make sure that's sane!
     366         4495 :     pub async fn open_with_options(
     367         4495 :         path: &Utf8Path,
     368         4495 :         open_options: &OpenOptions,
     369         4495 :     ) -> Result<VirtualFile, std::io::Error> {
     370         4495 :         let path_str = path.to_string();
     371         4495 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     372         4495 :         let (tenant_id, shard_id, timeline_id) =
     373         4495 :             if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     374         3077 :                 let tenant_shard_part = parts[parts.len() - 4];
     375         3077 :                 let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
     376         3077 :                     Ok(tenant_shard_id) => (
     377         3077 :                         tenant_shard_id.tenant_id.to_string(),
     378         3077 :                         format!("{}", tenant_shard_id.shard_slug()),
     379         3077 :                     ),
     380              :                     Err(_) => {
     381              :                         // Malformed path: this ID is just for observability, so tolerate it
     382              :                         // and pass through
     383            0 :                         (tenant_shard_part.to_string(), "*".to_string())
     384              :                     }
     385              :                 };
     386         3077 :                 (tenant_id, shard_id, parts[parts.len() - 2].to_string())
     387              :             } else {
     388         1418 :                 ("*".to_string(), "*".to_string(), "*".to_string())
     389              :             };
     390         4495 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     391              : 
     392              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     393              :         // where our caller doesn't get to use the returned VirtualFile before its
     394              :         // slot gets re-used by someone else.
     395         4495 :         let file = observe_duration!(StorageIoOperation::Open, {
     396         4495 :             open_options.open(path.as_std_path()).await?
     397              :         });
     398              : 
     399              :         // Strip all options other than read and write.
     400              :         //
     401              :         // It would perhaps be nicer to check just for the read and write flags
     402              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     403              :         // only to set them.
     404         4495 :         let mut reopen_options = open_options.clone();
     405         4495 :         reopen_options.create(false);
     406         4495 :         reopen_options.create_new(false);
     407         4495 :         reopen_options.truncate(false);
     408         4495 : 
     409         4495 :         let vfile = VirtualFile {
     410         4495 :             handle: RwLock::new(handle),
     411         4495 :             pos: 0,
     412         4495 :             path: path.to_path_buf(),
     413         4495 :             open_options: reopen_options,
     414         4495 :             tenant_id,
     415         4495 :             shard_id,
     416         4495 :             timeline_id,
     417         4495 :         };
     418         4495 : 
     419         4495 :         // TODO: Under pressure, it's likely the slot will get re-used and
     420         4495 :         // the underlying file closed before they get around to using it.
     421         4495 :         // => https://github.com/neondatabase/neon/issues/6065
     422         4495 :         slot_guard.file.replace(file);
     423         4495 : 
     424         4495 :         Ok(vfile)
     425         4495 :     }
     426              : 
     427              :     /// Async version of [`::utils::crashsafe::overwrite`].
     428              :     ///
     429              :     /// # NB:
     430              :     ///
     431              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     432              :     /// it did at an earlier time.
     433              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     434           28 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     435           28 :         final_path: Utf8PathBuf,
     436           28 :         tmp_path: Utf8PathBuf,
     437           28 :         content: B,
     438           28 :     ) -> std::io::Result<()> {
     439           28 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     440           28 :         // See https://github.com/neondatabase/neon/issues/6663
     441           28 : 
     442           28 :         tokio::task::spawn_blocking(move || {
     443           28 :             let slice_storage;
     444           28 :             let content_len = content.bytes_init();
     445           28 :             let content = if content.bytes_init() > 0 {
     446           28 :                 slice_storage = Some(content.slice(0..content_len));
     447           28 :                 slice_storage.as_deref().expect("just set it to Some()")
     448              :             } else {
     449            0 :                 &[]
     450              :             };
     451           28 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     452           28 :         })
     453           28 :         .await
     454           28 :         .expect("blocking task is never aborted")
     455           28 :     }
     456              : 
     457              :     /// Call File::sync_all() on the underlying File.
     458         2139 :     pub async fn sync_all(&self) -> Result<(), Error> {
     459         2139 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     460         2139 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     461         2139 :             res
     462              :         })
     463         2139 :     }
     464              : 
     465              :     /// Call File::sync_data() on the underlying File.
     466            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     467            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     468            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     469            0 :             res
     470              :         })
     471            0 :     }
     472              : 
     473         1180 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     474         1180 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     475         1180 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     476         1180 :             res
     477              :         })
     478         1180 :     }
     479              : 
     480              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     481              :     /// opens it and evicts some other File if necessary. The passed parameter is
     482              :     /// assumed to be a function available for the physical `File`.
     483              :     ///
     484              :     /// We are doing it via a macro as Rust doesn't support async closures that
     485              :     /// take on parameters with lifetimes.
     486       406593 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     487       406593 :         let open_files = get_open_files();
     488              : 
     489       190938 :         let mut handle_guard = {
     490              :             // Read the cached slot handle, and see if the slot that it points to still
     491              :             // contains our File.
     492              :             //
     493              :             // We only need to hold the handle lock while we read the current handle. If
     494              :             // another thread closes the file and recycles the slot for a different file,
     495              :             // we will notice that the handle we read is no longer valid and retry.
     496       406593 :             let mut handle = *self.handle.read().await;
     497       513203 :             loop {
     498       513203 :                 // Check if the slot contains our File
     499       513203 :                 {
     500       513203 :                     let slot = &open_files.slots[handle.index];
     501       513203 :                     let slot_guard = slot.inner.read().await;
     502       513203 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     503              :                         // Found a cached file descriptor.
     504       215655 :                         slot.recently_used.store(true, Ordering::Relaxed);
     505       215655 :                         return Ok(FileGuard { slot_guard });
     506       297548 :                     }
     507              :                 }
     508              : 
     509              :                 // The slot didn't contain our File. We will have to open it ourselves,
     510              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     511              :                 // that no other thread will try to concurrently open the same file.
     512       297548 :                 let handle_guard = self.handle.write().await;
     513              : 
     514              :                 // If another thread changed the handle while we were not holding the lock,
     515              :                 // then the handle might now be valid again. Loop back to retry.
     516       297548 :                 if *handle_guard != handle {
     517       106610 :                     handle = *handle_guard;
     518       106610 :                     continue;
     519       190938 :                 }
     520       190938 :                 break handle_guard;
     521              :             }
     522              :         };
     523              : 
     524              :         // We need to open the file ourselves. The handle in the VirtualFile is
     525              :         // now locked in write-mode. Find a free slot to put it in.
     526       190938 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     527              : 
     528              :         // Re-open the physical file.
     529              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     530              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     531              :         // of the virtual file descriptor cache.
     532       190938 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     533       190938 :             self.open_options.open(self.path.as_std_path()).await?
     534              :         });
     535              : 
     536              :         // Store the File in the slot and update the handle in the VirtualFile
     537              :         // to point to it.
     538       190938 :         slot_guard.file.replace(file);
     539       190938 : 
     540       190938 :         *handle_guard = handle;
     541       190938 : 
     542       190938 :         return Ok(FileGuard {
     543       190938 :             slot_guard: slot_guard.downgrade(),
     544       190938 :         });
     545       406593 :     }
     546              : 
     547            0 :     pub fn remove(self) {
     548            0 :         let path = self.path.clone();
     549            0 :         drop(self);
     550            0 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     551            0 :     }
     552              : 
     553         3552 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     554         3552 :         match pos {
     555         3542 :             SeekFrom::Start(offset) => {
     556         3542 :                 self.pos = offset;
     557         3542 :             }
     558            4 :             SeekFrom::End(offset) => {
     559            4 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     560            4 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     561              :             }
     562            6 :             SeekFrom::Current(offset) => {
     563            6 :                 let pos = self.pos as i128 + offset as i128;
     564            6 :                 if pos < 0 {
     565            2 :                     return Err(Error::new(
     566            2 :                         ErrorKind::InvalidInput,
     567            2 :                         "offset would be negative",
     568            2 :                     ));
     569            4 :                 }
     570            4 :                 if pos > u64::MAX as i128 {
     571            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     572            4 :                 }
     573            4 :                 self.pos = pos as u64;
     574              :             }
     575              :         }
     576         3548 :         Ok(self.pos)
     577         3552 :     }
     578              : 
     579       331667 :     pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error>
     580       331667 :     where
     581       331667 :         B: IoBufMut + Send,
     582       331667 :     {
     583       331667 :         let (buf, res) =
     584       729025 :             read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
     585       331667 :         res.map(|()| buf)
     586       331667 :     }
     587              : 
     588        30196 :     pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
     589        30196 :     where
     590        30196 :         B: IoBufMut + Send,
     591        30196 :     {
     592        30196 :         let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
     593        30196 :             self.read_at(buf, offset)
     594        30196 :         })
     595        15369 :         .await;
     596        30196 :         res.map(|()| buf)
     597        30196 :     }
     598              : 
     599              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     600       111465 :     pub async fn read_exact_at_page(
     601       111465 :         &self,
     602       111465 :         page: PageWriteGuard<'static>,
     603       111465 :         offset: u64,
     604       111465 :     ) -> Result<PageWriteGuard<'static>, Error> {
     605       111465 :         let buf = PageWriteGuardBuf {
     606       111465 :             page,
     607       111465 :             init_up_to: 0,
     608       111465 :         };
     609       111465 :         let res = self.read_exact_at(buf, offset).await;
     610       111465 :         res.map(|PageWriteGuardBuf { page, .. }| page)
     611       111465 :             .map_err(|e| Error::new(ErrorKind::Other, e))
     612       111465 :     }
     613              : 
     614              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     615            4 :     pub async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     616            4 :         &self,
     617            4 :         buf: B,
     618            4 :         mut offset: u64,
     619            4 :         ctx: &RequestContext,
     620            4 :     ) -> (B::Buf, Result<(), Error>) {
     621            4 :         let buf_len = buf.bytes_init();
     622            4 :         if buf_len == 0 {
     623            0 :             return (Slice::into_inner(buf.slice_full()), Ok(()));
     624            4 :         }
     625            4 :         let mut buf = buf.slice(0..buf_len);
     626            8 :         while !buf.is_empty() {
     627              :             let res;
     628            4 :             (buf, res) = self.write_at(buf, offset, ctx).await;
     629            0 :             match res {
     630              :                 Ok(0) => {
     631            0 :                     return (
     632            0 :                         Slice::into_inner(buf),
     633            0 :                         Err(Error::new(
     634            0 :                             std::io::ErrorKind::WriteZero,
     635            0 :                             "failed to write whole buffer",
     636            0 :                         )),
     637            0 :                     );
     638              :                 }
     639            4 :                 Ok(n) => {
     640            4 :                     buf = buf.slice(n..);
     641            4 :                     offset += n as u64;
     642            4 :                 }
     643            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     644            0 :                 Err(e) => return (Slice::into_inner(buf), Err(e)),
     645              :             }
     646              :         }
     647            4 :         (Slice::into_inner(buf), Ok(()))
     648            4 :     }
     649              : 
     650              :     /// Writes `buf.slice(0..buf.bytes_init())`.
     651              :     /// Returns the IoBuf that is underlying the BoundedBuf `buf`.
     652              :     /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
     653              :     /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
     654        40991 :     pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     655        40991 :         &mut self,
     656        40991 :         buf: B,
     657        40991 :         ctx: &RequestContext,
     658        40991 :     ) -> (B::Buf, Result<usize, Error>) {
     659        40991 :         let nbytes = buf.bytes_init();
     660        40991 :         if nbytes == 0 {
     661           32 :             return (Slice::into_inner(buf.slice_full()), Ok(0));
     662        40959 :         }
     663        40959 :         let mut buf = buf.slice(0..nbytes);
     664        81916 :         while !buf.is_empty() {
     665              :             let res;
     666        40959 :             (buf, res) = self.write(buf, ctx).await;
     667            2 :             match res {
     668              :                 Ok(0) => {
     669            0 :                     return (
     670            0 :                         Slice::into_inner(buf),
     671            0 :                         Err(Error::new(
     672            0 :                             std::io::ErrorKind::WriteZero,
     673            0 :                             "failed to write whole buffer",
     674            0 :                         )),
     675            0 :                     );
     676              :                 }
     677        40957 :                 Ok(n) => {
     678        40957 :                     buf = buf.slice(n..);
     679        40957 :                 }
     680            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     681            2 :                 Err(e) => return (Slice::into_inner(buf), Err(e)),
     682              :             }
     683              :         }
     684        40957 :         (Slice::into_inner(buf), Ok(nbytes))
     685        40991 :     }
     686              : 
     687        40959 :     async fn write<B: IoBuf + Send>(
     688        40959 :         &mut self,
     689        40959 :         buf: Slice<B>,
     690        40959 :         ctx: &RequestContext,
     691        40959 :     ) -> (Slice<B>, Result<usize, std::io::Error>) {
     692        40959 :         let pos = self.pos;
     693        40959 :         let (buf, res) = self.write_at(buf, pos, ctx).await;
     694        40959 :         let n = match res {
     695        40957 :             Ok(n) => n,
     696            2 :             Err(e) => return (buf, Err(e)),
     697              :         };
     698        40957 :         self.pos += n as u64;
     699        40957 :         (buf, Ok(n))
     700        40959 :     }
     701              : 
     702       362307 :     pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
     703       362307 :     where
     704       362307 :         B: tokio_epoll_uring::BoundedBufMut + Send,
     705       362307 :     {
     706       563471 :         let file_guard = match self.lock_file().await {
     707       362307 :             Ok(file_guard) => file_guard,
     708            0 :             Err(e) => return (buf, Err(e)),
     709              :         };
     710              : 
     711       362307 :         observe_duration!(StorageIoOperation::Read, {
     712       362307 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     713       362307 :             if let Ok(size) = res {
     714       362305 :                 STORAGE_IO_SIZE
     715       362305 :                     .with_label_values(&[
     716       362305 :                         "read",
     717       362305 :                         &self.tenant_id,
     718       362305 :                         &self.shard_id,
     719       362305 :                         &self.timeline_id,
     720       362305 :                     ])
     721       362305 :                     .add(size as i64);
     722       362305 :             }
     723       362307 :             (buf, res)
     724              :         })
     725       362307 :     }
     726              : 
     727        40963 :     async fn write_at<B: IoBuf + Send>(
     728        40963 :         &self,
     729        40963 :         buf: Slice<B>,
     730        40963 :         offset: u64,
     731        40963 :         _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
     732        40963 :     ) -> (Slice<B>, Result<usize, Error>) {
     733        40963 :         let file_guard = match self.lock_file().await {
     734        40963 :             Ok(file_guard) => file_guard,
     735            0 :             Err(e) => return (buf, Err(e)),
     736              :         };
     737        40963 :         observe_duration!(StorageIoOperation::Write, {
     738        40963 :             let ((_file_guard, buf), result) =
     739        40963 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     740        40963 :             if let Ok(size) = result {
     741        40961 :                 STORAGE_IO_SIZE
     742        40961 :                     .with_label_values(&[
     743        40961 :                         "write",
     744        40961 :                         &self.tenant_id,
     745        40961 :                         &self.shard_id,
     746        40961 :                         &self.timeline_id,
     747        40961 :                     ])
     748        40961 :                     .add(size as i64);
     749        40961 :             }
     750        40963 :             (buf, result)
     751              :         })
     752        40963 :     }
     753              : }
     754              : 
     755              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     756       361873 : pub async fn read_exact_at_impl<B, F, Fut>(
     757       361873 :     buf: B,
     758       361873 :     mut offset: u64,
     759       361873 :     count: Option<usize>,
     760       361873 :     mut read_at: F,
     761       361873 : ) -> (B, std::io::Result<()>)
     762       361873 : where
     763       361873 :     B: IoBufMut + Send,
     764       361873 :     F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
     765       361873 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
     766       361873 : {
     767       361873 :     let mut buf: tokio_epoll_uring::Slice<B> = match count {
     768        30198 :         Some(count) => {
     769        30198 :             assert!(count <= buf.bytes_total());
     770        30198 :             assert!(count > 0);
     771        30198 :             buf.slice(..count) // may include uninitialized memory
     772              :         }
     773       331675 :         None => buf.slice_full(), // includes all the uninitialized memory
     774              :     };
     775              : 
     776       723748 :     while buf.bytes_total() != 0 {
     777              :         let res;
     778       744394 :         (buf, res) = read_at(buf, offset).await;
     779            0 :         match res {
     780            2 :             Ok(0) => break,
     781       361875 :             Ok(n) => {
     782       361875 :                 buf = buf.slice(n..);
     783       361875 :                 offset += n as u64;
     784       361875 :             }
     785            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     786            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     787              :         }
     788              :     }
     789              :     // NB: don't use `buf.is_empty()` here; it is from the
     790              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     791              :     // returned by it only covers the initialized portion of `buf`.
     792              :     // Whereas we're interested in ensuring that we filled the entire
     793              :     // buffer that the user passed in.
     794       361873 :     if buf.bytes_total() != 0 {
     795            2 :         (
     796            2 :             buf.into_inner(),
     797            2 :             Err(std::io::Error::new(
     798            2 :                 std::io::ErrorKind::UnexpectedEof,
     799            2 :                 "failed to fill whole buffer",
     800            2 :             )),
     801            2 :         )
     802              :     } else {
     803       361871 :         assert_eq!(buf.len(), buf.bytes_total());
     804       361871 :         (buf.into_inner(), Ok(()))
     805              :     }
     806       361873 : }
     807              : 
     808              : #[cfg(test)]
     809              : mod test_read_exact_at_impl {
     810              : 
     811              :     use std::{collections::VecDeque, sync::Arc};
     812              : 
     813              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     814              : 
     815              :     use super::read_exact_at_impl;
     816              : 
     817              :     struct Expectation {
     818              :         offset: u64,
     819              :         bytes_total: usize,
     820              :         result: std::io::Result<Vec<u8>>,
     821              :     }
     822              :     struct MockReadAt {
     823              :         expectations: VecDeque<Expectation>,
     824              :     }
     825              : 
     826              :     impl MockReadAt {
     827           14 :         async fn read_at(
     828           14 :             &mut self,
     829           14 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     830           14 :             offset: u64,
     831           14 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     832           14 :             let exp = self
     833           14 :                 .expectations
     834           14 :                 .pop_front()
     835           14 :                 .expect("read_at called but we have no expectations left");
     836           14 :             assert_eq!(exp.offset, offset);
     837           14 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     838           14 :             match exp.result {
     839           14 :                 Ok(bytes) => {
     840           14 :                     assert!(bytes.len() <= buf.bytes_total());
     841           14 :                     buf.put_slice(&bytes);
     842           14 :                     (buf, Ok(bytes.len()))
     843              :                 }
     844            0 :                 Err(e) => (buf, Err(e)),
     845              :             }
     846           14 :         }
     847              :     }
     848              : 
     849              :     impl Drop for MockReadAt {
     850           10 :         fn drop(&mut self) {
     851           10 :             assert_eq!(self.expectations.len(), 0);
     852           10 :         }
     853              :     }
     854              : 
     855              :     #[tokio::test]
     856            2 :     async fn test_basic() {
     857            2 :         let buf = Vec::with_capacity(5);
     858            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     859            2 :             expectations: VecDeque::from(vec![Expectation {
     860            2 :                 offset: 0,
     861            2 :                 bytes_total: 5,
     862            2 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     863            2 :             }]),
     864            2 :         }));
     865            2 :         let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     866            2 :             let mock_read_at = Arc::clone(&mock_read_at);
     867            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     868            2 :         })
     869            2 :         .await;
     870            2 :         assert!(res.is_ok());
     871            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
     872            2 :     }
     873              : 
     874              :     #[tokio::test]
     875            2 :     async fn test_with_count() {
     876            2 :         let buf = Vec::with_capacity(5);
     877            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     878            2 :             expectations: VecDeque::from(vec![Expectation {
     879            2 :                 offset: 0,
     880            2 :                 bytes_total: 3,
     881            2 :                 result: Ok(vec![b'a', b'b', b'c']),
     882            2 :             }]),
     883            2 :         }));
     884            2 : 
     885            2 :         let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
     886            2 :             let mock_read_at = Arc::clone(&mock_read_at);
     887            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     888            2 :         })
     889            2 :         .await;
     890            2 :         assert!(res.is_ok());
     891            2 :         assert_eq!(buf, vec![b'a', b'b', b'c']);
     892            2 :     }
     893              : 
     894              :     #[tokio::test]
     895            2 :     async fn test_empty_buf_issues_no_syscall() {
     896            2 :         let buf = Vec::new();
     897            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     898            2 :             expectations: VecDeque::new(),
     899            2 :         }));
     900            2 :         let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     901            0 :             let mock_read_at = Arc::clone(&mock_read_at);
     902            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     903            2 :         })
     904            2 :         .await;
     905            2 :         assert!(res.is_ok());
     906            2 :     }
     907              : 
     908              :     #[tokio::test]
     909            2 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
     910            2 :         let buf = Vec::with_capacity(4);
     911            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     912            2 :             expectations: VecDeque::from(vec![
     913            2 :                 Expectation {
     914            2 :                     offset: 0,
     915            2 :                     bytes_total: 4,
     916            2 :                     result: Ok(vec![b'a', b'b']),
     917            2 :                 },
     918            2 :                 Expectation {
     919            2 :                     offset: 2,
     920            2 :                     bytes_total: 2,
     921            2 :                     result: Ok(vec![b'c', b'd']),
     922            2 :                 },
     923            2 :             ]),
     924            2 :         }));
     925            4 :         let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     926            4 :             let mock_read_at = Arc::clone(&mock_read_at);
     927            4 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     928            4 :         })
     929            2 :         .await;
     930            2 :         assert!(res.is_ok());
     931            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
     932            2 :     }
     933              : 
     934              :     #[tokio::test]
     935            2 :     async fn test_eof_before_buffer_full() {
     936            2 :         let buf = Vec::with_capacity(3);
     937            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     938            2 :             expectations: VecDeque::from(vec![
     939            2 :                 Expectation {
     940            2 :                     offset: 0,
     941            2 :                     bytes_total: 3,
     942            2 :                     result: Ok(vec![b'a']),
     943            2 :                 },
     944            2 :                 Expectation {
     945            2 :                     offset: 1,
     946            2 :                     bytes_total: 2,
     947            2 :                     result: Ok(vec![b'b']),
     948            2 :                 },
     949            2 :                 Expectation {
     950            2 :                     offset: 2,
     951            2 :                     bytes_total: 1,
     952            2 :                     result: Ok(vec![]),
     953            2 :                 },
     954            2 :             ]),
     955            2 :         }));
     956            6 :         let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     957            6 :             let mock_read_at = Arc::clone(&mock_read_at);
     958            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     959            6 :         })
     960            2 :         .await;
     961            2 :         let Err(err) = res else {
     962            2 :             panic!("should return an error");
     963            2 :         };
     964            2 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
     965            2 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
     966            2 :         // buffer contents on error are unspecified
     967            2 :     }
     968              : }
     969              : 
     970              : struct FileGuard {
     971              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
     972              : }
     973              : 
     974              : impl AsRef<OwnedFd> for FileGuard {
     975       406593 :     fn as_ref(&self) -> &OwnedFd {
     976       406593 :         // This unwrap is safe because we only create `FileGuard`s
     977       406593 :         // if we know that the file is Some.
     978       406593 :         self.slot_guard.file.as_ref().unwrap()
     979       406593 :     }
     980              : }
     981              : 
     982              : impl FileGuard {
     983              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     984       203349 :     fn with_std_file<F, R>(&self, with: F) -> R
     985       203349 :     where
     986       203349 :         F: FnOnce(&File) -> R,
     987       203349 :     {
     988       203349 :         // SAFETY:
     989       203349 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
     990       203349 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
     991       203349 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
     992       203349 :         let res = with(&file);
     993       203349 :         let _ = file.into_raw_fd();
     994       203349 :         res
     995       203349 :     }
     996              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     997            4 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
     998            4 :     where
     999            4 :         F: FnOnce(&mut File) -> R,
    1000            4 :     {
    1001            4 :         // SAFETY:
    1002            4 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
    1003            4 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
    1004            4 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1005            4 :         let res = with(&mut file);
    1006            4 :         let _ = file.into_raw_fd();
    1007            4 :         res
    1008            4 :     }
    1009              : }
    1010              : 
    1011              : impl tokio_epoll_uring::IoFd for FileGuard {
    1012       203240 :     unsafe fn as_fd(&self) -> RawFd {
    1013       203240 :         let owned_fd: &OwnedFd = self.as_ref();
    1014       203240 :         owned_fd.as_raw_fd()
    1015       203240 :     }
    1016              : }
    1017              : 
    1018              : #[cfg(test)]
    1019              : impl VirtualFile {
    1020        20200 :     pub(crate) async fn read_blk(
    1021        20200 :         &self,
    1022        20200 :         blknum: u32,
    1023        20200 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1024        20200 :         use crate::page_cache::PAGE_SZ;
    1025        20200 :         let buf = vec![0; PAGE_SZ];
    1026        20200 :         let buf = self
    1027        20200 :             .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
    1028        10255 :             .await?;
    1029        20200 :         Ok(crate::tenant::block_io::BlockLease::Vec(buf))
    1030        20200 :     }
    1031              : 
    1032          224 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
    1033          224 :         let mut tmp = vec![0; 128];
    1034              :         loop {
    1035              :             let res;
    1036          444 :             (tmp, res) = self.read_at(tmp, self.pos).await;
    1037            2 :             match res {
    1038          222 :                 Ok(0) => return Ok(()),
    1039          220 :                 Ok(n) => {
    1040          220 :                     self.pos += n as u64;
    1041          220 :                     buf.extend_from_slice(&tmp[..n]);
    1042          220 :                 }
    1043            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1044            2 :                 Err(e) => return Err(e),
    1045              :             }
    1046              :         }
    1047          224 :     }
    1048              : }
    1049              : 
    1050              : impl Drop for VirtualFile {
    1051              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1052         3809 :     fn drop(&mut self) {
    1053         3809 :         let handle = self.handle.get_mut();
    1054         3809 : 
    1055         3809 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1056         3809 :             if slot_guard.tag == tag {
    1057         3809 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1058         3809 :                 // there is also operation "close-by-replace" for closes done on eviction for
    1059         3809 :                 // comparison.
    1060         3809 :                 if let Some(fd) = slot_guard.file.take() {
    1061         3331 :                     STORAGE_IO_TIME_METRIC
    1062         3331 :                         .get(StorageIoOperation::Close)
    1063         3331 :                         .observe_closure_duration(|| drop(fd));
    1064         3331 :                 }
    1065         3809 :             }
    1066         3809 :         }
    1067         3809 : 
    1068         3809 :         // We don't have async drop so we cannot directly await the lock here.
    1069         3809 :         // Instead, first do a best-effort attempt at closing the underlying
    1070         3809 :         // file descriptor by using `try_write`, and if that fails, spawn
    1071         3809 :         // a tokio task to do it asynchronously: we just want it to be
    1072         3809 :         // cleaned up eventually.
    1073         3809 :         // Most of the time, the `try_lock` should succeed though,
    1074         3809 :         // as we have `&mut self` access. In other words, if the slot
    1075         3809 :         // is still occupied by our file, there should be no access from
    1076         3809 :         // other I/O operations; the only other possible place to lock
    1077         3809 :         // the slot is the lock algorithm looking for free slots.
    1078         3809 :         let slot = &get_open_files().slots[handle.index];
    1079         3809 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1080         3809 :             clean_slot(slot, slot_guard, handle.tag);
    1081         3809 :         } else {
    1082            0 :             let tag = handle.tag;
    1083            0 :             tokio::spawn(async move {
    1084            0 :                 let slot_guard = slot.inner.write().await;
    1085            0 :                 clean_slot(slot, slot_guard, tag);
    1086            0 :             });
    1087            0 :         };
    1088         3809 :     }
    1089              : }
    1090              : 
    1091              : impl OwnedAsyncWriter for VirtualFile {
    1092              :     #[inline(always)]
    1093            3 :     async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1094            3 :         &mut self,
    1095            3 :         buf: B,
    1096            3 :         ctx: &RequestContext,
    1097            3 :     ) -> std::io::Result<(usize, B::Buf)> {
    1098            3 :         let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
    1099            3 :         res.map(move |v| (v, buf))
    1100            3 :     }
    1101              : }
    1102              : 
    1103              : impl OpenFiles {
    1104          126 :     fn new(num_slots: usize) -> OpenFiles {
    1105          126 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1106         1260 :         for _ in 0..num_slots {
    1107         1260 :             let slot = Slot {
    1108         1260 :                 recently_used: AtomicBool::new(false),
    1109         1260 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1110         1260 :             };
    1111         1260 :             slots.push(slot);
    1112         1260 :         }
    1113              : 
    1114          126 :         OpenFiles {
    1115          126 :             next: AtomicUsize::new(0),
    1116          126 :             slots: Box::leak(slots),
    1117          126 :         }
    1118          126 :     }
    1119              : }
    1120              : 
    1121              : ///
    1122              : /// Initialize the virtual file module. This must be called once at page
    1123              : /// server startup.
    1124              : ///
    1125              : #[cfg(not(test))]
    1126            0 : pub fn init(num_slots: usize, engine: IoEngineKind) {
    1127            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1128            0 :         panic!("virtual_file::init called twice");
    1129            0 :     }
    1130            0 :     io_engine::init(engine);
    1131            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1132            0 : }
    1133              : 
    1134              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1135              : 
    1136              : // Get a handle to the global slots array.
    1137       414897 : fn get_open_files() -> &'static OpenFiles {
    1138       414897 :     //
    1139       414897 :     // In unit tests, page server startup doesn't happen and no one calls
    1140       414897 :     // virtual_file::init(). Initialize it here, with a small array.
    1141       414897 :     //
    1142       414897 :     // This applies to the virtual file tests below, but all other unit
    1143       414897 :     // tests too, so the virtual file facility is always usable in
    1144       414897 :     // unit tests.
    1145       414897 :     //
    1146       414897 :     if cfg!(test) {
    1147       414897 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1148              :     } else {
    1149            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1150              :     }
    1151       414897 : }
    1152              : 
    1153              : #[cfg(test)]
    1154              : mod tests {
    1155              :     use crate::context::DownloadBehavior;
    1156              :     use crate::task_mgr::TaskKind;
    1157              : 
    1158              :     use super::*;
    1159              :     use rand::seq::SliceRandom;
    1160              :     use rand::thread_rng;
    1161              :     use rand::Rng;
    1162              :     use std::future::Future;
    1163              :     use std::io::Write;
    1164              :     use std::os::unix::fs::FileExt;
    1165              :     use std::sync::Arc;
    1166              : 
    1167              :     enum MaybeVirtualFile {
    1168              :         VirtualFile(VirtualFile),
    1169              :         File(File),
    1170              :     }
    1171              : 
    1172              :     impl From<VirtualFile> for MaybeVirtualFile {
    1173            6 :         fn from(vf: VirtualFile) -> Self {
    1174            6 :             MaybeVirtualFile::VirtualFile(vf)
    1175            6 :         }
    1176              :     }
    1177              : 
    1178              :     impl MaybeVirtualFile {
    1179          404 :         async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
    1180          404 :             match self {
    1181          202 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
    1182          202 :                 MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
    1183              :             }
    1184          404 :         }
    1185            8 :         async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1186            8 :             &self,
    1187            8 :             buf: B,
    1188            8 :             offset: u64,
    1189            8 :             ctx: &RequestContext,
    1190            8 :         ) -> Result<(), Error> {
    1191            8 :             match self {
    1192            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1193            4 :                     let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
    1194            4 :                     res
    1195              :                 }
    1196            4 :                 MaybeVirtualFile::File(file) => {
    1197            4 :                     let buf_len = buf.bytes_init();
    1198            4 :                     if buf_len == 0 {
    1199            0 :                         return Ok(());
    1200            4 :                     }
    1201            4 :                     file.write_all_at(&buf.slice(0..buf_len), offset)
    1202              :                 }
    1203              :             }
    1204            8 :         }
    1205           36 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1206           36 :             match self {
    1207           18 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1208           18 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1209              :             }
    1210           36 :         }
    1211            8 :         async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1212            8 :             &mut self,
    1213            8 :             buf: B,
    1214            8 :             ctx: &RequestContext,
    1215            8 :         ) -> Result<(), Error> {
    1216            8 :             match self {
    1217            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1218            4 :                     let (_buf, res) = file.write_all(buf, ctx).await;
    1219            4 :                     res.map(|_| ())
    1220              :                 }
    1221            4 :                 MaybeVirtualFile::File(file) => {
    1222            4 :                     let buf_len = buf.bytes_init();
    1223            4 :                     if buf_len == 0 {
    1224            0 :                         return Ok(());
    1225            4 :                     }
    1226            4 :                     file.write_all(&buf.slice(0..buf_len))
    1227              :                 }
    1228              :             }
    1229            8 :         }
    1230              : 
    1231              :         // Helper function to slurp contents of a file, starting at the current position,
    1232              :         // into a string
    1233          442 :         async fn read_string(&mut self) -> Result<String, Error> {
    1234          442 :             use std::io::Read;
    1235          442 :             let mut buf = String::new();
    1236          442 :             match self {
    1237          224 :                 MaybeVirtualFile::VirtualFile(file) => {
    1238          224 :                     let mut buf = Vec::new();
    1239          226 :                     file.read_to_end(&mut buf).await?;
    1240          222 :                     return Ok(String::from_utf8(buf).unwrap());
    1241              :                 }
    1242          218 :                 MaybeVirtualFile::File(file) => {
    1243          218 :                     file.read_to_string(&mut buf)?;
    1244              :                 }
    1245              :             }
    1246          216 :             Ok(buf)
    1247          442 :         }
    1248              : 
    1249              :         // Helper function to slurp a portion of a file into a string
    1250          404 :         async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
    1251          404 :             let buf = vec![0; len];
    1252          404 :             let buf = self.read_exact_at(buf, pos).await?;
    1253          404 :             Ok(String::from_utf8(buf).unwrap())
    1254          404 :         }
    1255              :     }
    1256              : 
    1257              :     #[tokio::test]
    1258            2 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1259            2 :         // The real work is done in the test_files() helper function. This
    1260            2 :         // allows us to run the same set of tests against a native File, and
    1261            2 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1262            2 :         // but this allows us to verify that the operations return the same
    1263            2 :         // results with VirtualFiles as with native Files. (Except that with
    1264            2 :         // native files, you will run out of file descriptors if the ulimit
    1265            2 :         // is low enough.)
    1266          206 :         test_files("virtual_files", |path, open_options| async move {
    1267          206 :             let vf = VirtualFile::open_with_options(&path, &open_options).await?;
    1268          206 :             Ok(MaybeVirtualFile::VirtualFile(vf))
    1269          206 :         })
    1270          530 :         .await
    1271            2 :     }
    1272              : 
    1273              :     #[tokio::test]
    1274            2 :     async fn test_physical_files() -> anyhow::Result<()> {
    1275          206 :         test_files("physical_files", |path, open_options| async move {
    1276          206 :             Ok(MaybeVirtualFile::File({
    1277          206 :                 let owned_fd = open_options.open(path.as_std_path()).await?;
    1278          206 :                 File::from(owned_fd)
    1279          206 :             }))
    1280          206 :         })
    1281          104 :         .await
    1282            2 :     }
    1283              : 
    1284            4 :     async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
    1285            4 :     where
    1286            4 :         OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
    1287            4 :         FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
    1288            4 :     {
    1289            4 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    1290            4 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1291            4 :         std::fs::create_dir_all(&testdir)?;
    1292              : 
    1293            4 :         let path_a = testdir.join("file_a");
    1294            4 :         let mut file_a = openfunc(
    1295            4 :             path_a.clone(),
    1296            4 :             OpenOptions::new()
    1297            4 :                 .write(true)
    1298            4 :                 .create(true)
    1299            4 :                 .truncate(true)
    1300            4 :                 .to_owned(),
    1301            4 :         )
    1302            4 :         .await?;
    1303            4 :         file_a.write_all(b"foobar".to_vec(), &ctx).await?;
    1304              : 
    1305              :         // cannot read from a file opened in write-only mode
    1306            4 :         let _ = file_a.read_string().await.unwrap_err();
    1307              : 
    1308              :         // Close the file and re-open for reading
    1309            4 :         let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
    1310              : 
    1311              :         // cannot write to a file opened in read-only mode
    1312            4 :         let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err();
    1313            4 : 
    1314            4 :         // Try simple read
    1315            4 :         assert_eq!("foobar", file_a.read_string().await?);
    1316              : 
    1317              :         // It's positioned at the EOF now.
    1318            4 :         assert_eq!("", file_a.read_string().await?);
    1319              : 
    1320              :         // Test seeks.
    1321            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1322            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1323              : 
    1324            4 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1325            4 :         assert_eq!("ar", file_a.read_string().await?);
    1326              : 
    1327            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1328            4 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1329            4 :         assert_eq!("bar", file_a.read_string().await?);
    1330              : 
    1331            4 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1332            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1333              : 
    1334              :         // Test erroneous seeks to before byte 0
    1335            4 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1336            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1337            4 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1338            4 : 
    1339            4 :         // the erroneous seek should have left the position unchanged
    1340            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1341              : 
    1342              :         // Create another test file, and try FileExt functions on it.
    1343            4 :         let path_b = testdir.join("file_b");
    1344            4 :         let mut file_b = openfunc(
    1345            4 :             path_b.clone(),
    1346            4 :             OpenOptions::new()
    1347            4 :                 .read(true)
    1348            4 :                 .write(true)
    1349            4 :                 .create(true)
    1350            4 :                 .truncate(true)
    1351            4 :                 .to_owned(),
    1352            4 :         )
    1353            2 :         .await?;
    1354            4 :         file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?;
    1355            4 :         file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?;
    1356              : 
    1357            4 :         assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
    1358              : 
    1359              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1360              :         // open the same file many times. The effect is the same.)
    1361              :         //
    1362              :         // leave file_a positioned at offset 1 before we start
    1363            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1364              : 
    1365            4 :         let mut vfiles = Vec::new();
    1366          404 :         for _ in 0..100 {
    1367          400 :             let mut vfile =
    1368          400 :                 openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
    1369          400 :             assert_eq!("FOOBAR", vfile.read_string().await?);
    1370          400 :             vfiles.push(vfile);
    1371              :         }
    1372              : 
    1373              :         // make sure we opened enough files to definitely cause evictions.
    1374            4 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1375              : 
    1376              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1377              :         // from it again. We left the file positioned at offset 1 above.
    1378            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1379              : 
    1380              :         // Check that all the other FDs still work too. Use them in random order for
    1381              :         // good measure.
    1382            4 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1383          400 :         for vfile in vfiles.iter_mut() {
    1384          400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
    1385              :         }
    1386              : 
    1387            4 :         Ok(())
    1388            4 :     }
    1389              : 
    1390              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1391              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1392              :     /// VirtualFile from multiple threads concurrently.
    1393              :     #[tokio::test]
    1394            2 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1395            2 :         const SIZE: usize = 8 * 1024;
    1396            2 :         const VIRTUAL_FILES: usize = 100;
    1397            2 :         const THREADS: usize = 100;
    1398            2 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1399            2 : 
    1400            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1401            2 :         std::fs::create_dir_all(&testdir)?;
    1402            2 : 
    1403            2 :         // Create a test file.
    1404            2 :         let test_file_path = testdir.join("concurrency_test_file");
    1405            2 :         {
    1406            2 :             let file = File::create(&test_file_path)?;
    1407            2 :             file.write_all_at(&SAMPLE, 0)?;
    1408            2 :         }
    1409            2 : 
    1410            2 :         // Open the file many times.
    1411            2 :         let mut files = Vec::new();
    1412          202 :         for _ in 0..VIRTUAL_FILES {
    1413          200 :             let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
    1414          101 :                 .await?;
    1415          200 :             files.push(f);
    1416            2 :         }
    1417            2 :         let files = Arc::new(files);
    1418            2 : 
    1419            2 :         // Launch many threads, and use the virtual files concurrently in random order.
    1420            2 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1421            2 :             .worker_threads(THREADS)
    1422            2 :             .thread_name("test_vfile_concurrency thread")
    1423            2 :             .build()
    1424            2 :             .unwrap();
    1425            2 :         let mut hdls = Vec::new();
    1426          202 :         for _threadno in 0..THREADS {
    1427          200 :             let files = files.clone();
    1428          200 :             let hdl = rt.spawn(async move {
    1429          200 :                 let mut buf = vec![0u8; SIZE];
    1430          200 :                 let mut rng = rand::rngs::OsRng;
    1431       200000 :                 for _ in 1..1000 {
    1432       199800 :                     let f = &files[rng.gen_range(0..files.len())];
    1433       655859 :                     buf = f.read_exact_at(buf, 0).await.unwrap();
    1434       199800 :                     assert!(buf == SAMPLE);
    1435            2 :                 }
    1436          200 :             });
    1437          200 :             hdls.push(hdl);
    1438          200 :         }
    1439          202 :         for hdl in hdls {
    1440          200 :             hdl.await?;
    1441            2 :         }
    1442            2 :         std::mem::forget(rt);
    1443            2 : 
    1444            2 :         Ok(())
    1445            2 :     }
    1446              : 
    1447              :     #[tokio::test]
    1448            2 :     async fn test_atomic_overwrite_basic() {
    1449            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1450            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1451            2 : 
    1452            2 :         let path = testdir.join("myfile");
    1453            2 :         let tmp_path = testdir.join("myfile.tmp");
    1454            2 : 
    1455            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1456            2 :             .await
    1457            2 :             .unwrap();
    1458            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
    1459            2 :         let post = file.read_string().await.unwrap();
    1460            2 :         assert_eq!(post, "foo");
    1461            2 :         assert!(!tmp_path.exists());
    1462            2 :         drop(file);
    1463            2 : 
    1464            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1465            2 :             .await
    1466            2 :             .unwrap();
    1467            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
    1468            2 :         let post = file.read_string().await.unwrap();
    1469            2 :         assert_eq!(post, "bar");
    1470            2 :         assert!(!tmp_path.exists());
    1471            2 :         drop(file);
    1472            2 :     }
    1473              : 
    1474              :     #[tokio::test]
    1475            2 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1476            2 :         let testdir =
    1477            2 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1478            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1479            2 : 
    1480            2 :         let path = testdir.join("myfile");
    1481            2 :         let tmp_path = testdir.join("myfile.tmp");
    1482            2 : 
    1483            2 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1484            2 :         assert!(tmp_path.exists());
    1485            2 : 
    1486            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1487            2 :             .await
    1488            2 :             .unwrap();
    1489            2 : 
    1490            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
    1491            2 :         let post = file.read_string().await.unwrap();
    1492            2 :         assert_eq!(post, "foo");
    1493            2 :         assert!(!tmp_path.exists());
    1494            2 :         drop(file);
    1495            2 :     }
    1496              : }
        

Generated by: LCOV version 2.1-beta