LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 92.3 % 914 844
Test Date: 2024-04-18 15:32:49 Functions: 91.0 % 189 172

            Line data    Source code
       1              : //!
       2              : //! VirtualFile is like a normal File, but it's not bound directly to
       3              : //! a file descriptor. Instead, the file is opened when it's read from,
       4              : //! and if too many files are open globally in the system, least-recently
       5              : //! used ones are closed.
       6              : //!
       7              : //! To track which files have been recently used, we use the clock algorithm
       8              : //! with a 'recently_used' flag on each slot.
       9              : //!
      10              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11              : //! src/backend/storage/file/fd.c
      12              : //!
      13              : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      14              : 
      15              : use crate::page_cache::PageWriteGuard;
      16              : use crate::tenant::TENANTS_SEGMENT_NAME;
      17              : use camino::{Utf8Path, Utf8PathBuf};
      18              : use once_cell::sync::OnceCell;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use std::fs::File;
      21              : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      22              : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
      23              : 
      24              : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
      25              : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      26              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      27              : use tokio::time::Instant;
      28              : 
      29              : pub use pageserver_api::models::virtual_file as api;
      30              : pub(crate) mod io_engine;
      31              : pub use io_engine::feature_test as io_engine_feature_test;
      32              : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
      33              : mod metadata;
      34              : mod open_options;
      35              : pub(crate) use io_engine::IoEngineKind;
      36              : pub(crate) use metadata::Metadata;
      37              : pub(crate) use open_options::*;
      38              : 
      39              : #[cfg_attr(not(target_os = "linux"), allow(dead_code))]
      40              : pub(crate) mod owned_buffers_io {
      41              :     //! Abstractions for IO with owned buffers.
      42              :     //!
      43              :     //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
      44              :     //! reason we need this abstraction.
      45              :     //!
      46              :     //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
      47              :     //! but for the time being we're proving out the primitives in the neon.git repo
      48              :     //! for faster iteration.
      49              : 
      50              :     pub(crate) mod write;
      51              :     pub(crate) mod util {
      52              :         pub(crate) mod size_tracking_writer;
      53              :     }
      54              : }
      55              : 
      56              : ///
      57              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      58              : /// the underlying file is closed if the system is low on file descriptors,
      59              : /// and re-opened when it's accessed again.
      60              : ///
      61              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      62              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      63              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      64              : /// require a mutable reference, because they modify the "current position".
      65              : ///
      66              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      67              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      68              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      69              : /// 'tag' field is used to detect whether the handle still is valid or not.
      70              : ///
      71              : #[derive(Debug)]
      72              : pub struct VirtualFile {
      73              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      74              :     /// might contain our File, or it may be empty, or it may contain a File that
      75              :     /// belongs to a different VirtualFile.
      76              :     handle: RwLock<SlotHandle>,
      77              : 
      78              :     /// Current file position
      79              :     pos: u64,
      80              : 
      81              :     /// File path and options to use to open it.
      82              :     ///
      83              :     /// Note: this only contains the options needed to re-open it. For example,
      84              :     /// if a new file is created, we only pass the create flag when it's initially
      85              :     /// opened, in the VirtualFile::create() function, and strip the flag before
      86              :     /// storing it here.
      87              :     pub path: Utf8PathBuf,
      88              :     open_options: OpenOptions,
      89              : 
      90              :     // These are strings becase we only use them for metrics, and those expect strings.
      91              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      92              :     // strings.
      93              :     tenant_id: String,
      94              :     shard_id: String,
      95              :     timeline_id: String,
      96              : }
      97              : 
      98              : #[derive(Debug, PartialEq, Clone, Copy)]
      99              : struct SlotHandle {
     100              :     /// Index into OPEN_FILES.slots
     101              :     index: usize,
     102              : 
     103              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
     104              :     /// been recycled and no longer contains the FD for this virtual file.
     105              :     tag: u64,
     106              : }
     107              : 
     108              : /// OPEN_FILES is the global array that holds the physical file descriptors that
     109              : /// are currently open. Each slot in the array is protected by a separate lock,
     110              : /// so that different files can be accessed independently. The lock must be held
     111              : /// in write mode to replace the slot with a different file, but a read mode
     112              : /// is enough to operate on the file, whether you're reading or writing to it.
     113              : ///
     114              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
     115              : /// the virtual_file::init() function. It must be called exactly once at page
     116              : /// server startup.
     117              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
     118              : 
     119              : struct OpenFiles {
     120              :     slots: &'static [Slot],
     121              : 
     122              :     /// clock arm for the clock algorithm
     123              :     next: AtomicUsize,
     124              : }
     125              : 
     126              : struct Slot {
     127              :     inner: RwLock<SlotInner>,
     128              : 
     129              :     /// has this file been used since last clock sweep?
     130              :     recently_used: AtomicBool,
     131              : }
     132              : 
     133              : struct SlotInner {
     134              :     /// Counter that's incremented every time a different file is stored here.
     135              :     /// To avoid the ABA problem.
     136              :     tag: u64,
     137              : 
     138              :     /// the underlying file
     139              :     file: Option<OwnedFd>,
     140              : }
     141              : 
     142              : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
     143              : struct PageWriteGuardBuf {
     144              :     page: PageWriteGuard<'static>,
     145              :     init_up_to: usize,
     146              : }
     147              : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
     148              : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
     149              : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
     150       111230 :     fn stable_ptr(&self) -> *const u8 {
     151       111230 :         self.page.as_ptr()
     152       111230 :     }
     153       333690 :     fn bytes_init(&self) -> usize {
     154       333690 :         self.init_up_to
     155       333690 :     }
     156       111230 :     fn bytes_total(&self) -> usize {
     157       111230 :         self.page.len()
     158       111230 :     }
     159              : }
     160              : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
     161              : // hence it's safe to hand out the `stable_mut_ptr()`.
     162              : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
     163       111230 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     164       111230 :         self.page.as_mut_ptr()
     165       111230 :     }
     166              : 
     167       111230 :     unsafe fn set_init(&mut self, pos: usize) {
     168       111230 :         assert!(pos <= self.page.len());
     169       111230 :         self.init_up_to = pos;
     170       111230 :     }
     171              : }
     172              : 
     173              : impl OpenFiles {
     174              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     175              :     ///
     176              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     177              :     /// recently_used has been set. It's all ready for reuse.
     178       191391 :     async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     179       191391 :         //
     180       191391 :         // Run the clock algorithm to find a slot to replace.
     181       191391 :         //
     182       191391 :         let num_slots = self.slots.len();
     183       191391 :         let mut retries = 0;
     184              :         let mut slot;
     185              :         let mut slot_guard;
     186              :         let index;
     187      2335507 :         loop {
     188      2335507 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     189      2335507 :             slot = &self.slots[next];
     190      2335507 : 
     191      2335507 :             // If the recently_used flag on this slot is set, continue the clock
     192      2335507 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     193      2335507 :             // lock, also continue the clock sweep.
     194      2335507 :             //
     195      2335507 :             // We only continue in this manner for a while, though. If we loop
     196      2335507 :             // through the array twice without finding a victim, just pick the
     197      2335507 :             // next slot and wait until we can reuse it. This way, we avoid
     198      2335507 :             // spinning in the extreme case that all the slots are busy with an
     199      2335507 :             // I/O operation.
     200      2335507 :             if retries < num_slots * 2 {
     201      2247751 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     202      2040858 :                     if let Ok(guard) = slot.inner.try_write() {
     203       103635 :                         slot_guard = guard;
     204       103635 :                         index = next;
     205       103635 :                         break;
     206      1937223 :                     }
     207       206893 :                 }
     208      2144116 :                 retries += 1;
     209              :             } else {
     210        87756 :                 slot_guard = slot.inner.write().await;
     211        87756 :                 index = next;
     212        87756 :                 break;
     213              :             }
     214              :         }
     215              : 
     216              :         //
     217              :         // We now have the victim slot locked. If it was in use previously, close the
     218              :         // old file.
     219              :         //
     220       191391 :         if let Some(old_file) = slot_guard.file.take() {
     221       188469 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     222       188469 :             // distinguish the two.
     223       188469 :             STORAGE_IO_TIME_METRIC
     224       188469 :                 .get(StorageIoOperation::CloseByReplace)
     225       188469 :                 .observe_closure_duration(|| drop(old_file));
     226       188469 :         }
     227              : 
     228              :         // Prepare the slot for reuse and return it
     229       191391 :         slot_guard.tag += 1;
     230       191391 :         slot.recently_used.store(true, Ordering::Relaxed);
     231       191391 :         (
     232       191391 :             SlotHandle {
     233       191391 :                 index,
     234       191391 :                 tag: slot_guard.tag,
     235       191391 :             },
     236       191391 :             slot_guard,
     237       191391 :         )
     238       191391 :     }
     239              : }
     240              : 
     241              : /// Identify error types that should alwways terminate the process.  Other
     242              : /// error types may be elegible for retry.
     243            0 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
     244            0 :     use nix::errno::Errno::*;
     245            0 :     match e.raw_os_error().map(nix::errno::from_i32) {
     246              :         Some(EIO) => {
     247              :             // Terminate on EIO because we no longer trust the device to store
     248              :             // data safely, or to uphold persistence guarantees on fsync.
     249            0 :             true
     250              :         }
     251              :         Some(EROFS) => {
     252              :             // Terminate on EROFS because a filesystem is usually remounted
     253              :             // readonly when it has experienced some critical issue, so the same
     254              :             // logic as EIO applies.
     255            0 :             true
     256              :         }
     257              :         Some(EACCES) => {
     258              :             // Terminate on EACCESS because we should always have permissions
     259              :             // for our own data dir: if we don't, then we can't do our job and
     260              :             // need administrative intervention to fix permissions.  Terminating
     261              :             // is the best way to make sure we stop cleanly rather than going
     262              :             // into infinite retry loops, and will make it clear to the outside
     263              :             // world that we need help.
     264            0 :             true
     265              :         }
     266              :         _ => {
     267              :             // Treat all other local file I/O errors are retryable.  This includes:
     268              :             // - ENOSPC: we stay up and wait for eviction to free some space
     269              :             // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
     270              :             // - WriteZero, Interrupted: these are used internally VirtualFile
     271            0 :             false
     272              :         }
     273              :     }
     274            0 : }
     275              : 
     276              : /// Call this when the local filesystem gives us an error with an external
     277              : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
     278              : /// bad storage or bad configuration, and we can't fix that from inside
     279              : /// a running process.
     280            0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
     281            0 :     tracing::error!("Fatal I/O error: {e}: {context})");
     282            0 :     std::process::abort();
     283              : }
     284              : 
     285              : pub(crate) trait MaybeFatalIo<T> {
     286              :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
     287              :     fn fatal_err(self, context: &str) -> T;
     288              : }
     289              : 
     290              : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
     291              :     /// Terminate the process if the result is an error of a fatal type, else pass it through
     292              :     ///
     293              :     /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
     294              :     /// not on ENOSPC.
     295           22 :     fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
     296           22 :         if let Err(e) = &self {
     297            0 :             if is_fatal_io_error(e) {
     298            0 :                 on_fatal_io_error(e, context);
     299            0 :             }
     300           22 :         }
     301           22 :         self
     302           22 :     }
     303              : 
     304              :     /// Terminate the process on any I/O error.
     305              :     ///
     306              :     /// This is appropriate for reads on files that we know exist: they should always work.
     307         1514 :     fn fatal_err(self, context: &str) -> T {
     308         1514 :         match self {
     309         1514 :             Ok(v) => v,
     310            0 :             Err(e) => {
     311            0 :                 on_fatal_io_error(&e, context);
     312              :             }
     313              :         }
     314         1514 :     }
     315              : }
     316              : 
     317              : /// Observe duration for the given storage I/O operation
     318              : ///
     319              : /// Unlike `observe_closure_duration`, this supports async,
     320              : /// where "support" means that we measure wall clock time.
     321              : macro_rules! observe_duration {
     322              :     ($op:expr, $($body:tt)*) => {{
     323              :         let instant = Instant::now();
     324              :         let result = $($body)*;
     325              :         let elapsed = instant.elapsed().as_secs_f64();
     326              :         STORAGE_IO_TIME_METRIC
     327              :             .get($op)
     328              :             .observe(elapsed);
     329              :         result
     330              :     }}
     331              : }
     332              : 
     333              : macro_rules! with_file {
     334              :     ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
     335              :         let $ident = $this.lock_file().await?;
     336              :         observe_duration!($op, $($body)*)
     337              :     }};
     338              :     ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
     339              :         let mut $ident = $this.lock_file().await?;
     340              :         observe_duration!($op, $($body)*)
     341              :     }};
     342              : }
     343              : 
     344              : impl VirtualFile {
     345              :     /// Open a file in read-only mode. Like File::open.
     346         1400 :     pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
     347         1400 :         Self::open_with_options(path, OpenOptions::new().read(true)).await
     348         1400 :     }
     349              : 
     350              :     /// Create a new file for writing. If the file exists, it will be truncated.
     351              :     /// Like File::create.
     352          879 :     pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
     353          879 :         Self::open_with_options(
     354          879 :             path,
     355          879 :             OpenOptions::new().write(true).create(true).truncate(true),
     356          879 :         )
     357          463 :         .await
     358          879 :     }
     359              : 
     360              :     /// Open a file with given options.
     361              :     ///
     362              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     363              :     /// they will be applied also when the file is subsequently re-opened, not only
     364              :     /// on the first time. Make sure that's sane!
     365         3607 :     pub async fn open_with_options(
     366         3607 :         path: &Utf8Path,
     367         3607 :         open_options: &OpenOptions,
     368         3607 :     ) -> Result<VirtualFile, std::io::Error> {
     369         3607 :         let path_str = path.to_string();
     370         3607 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     371         3607 :         let (tenant_id, shard_id, timeline_id) =
     372         3607 :             if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     373         2407 :                 let tenant_shard_part = parts[parts.len() - 4];
     374         2407 :                 let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
     375         2407 :                     Ok(tenant_shard_id) => (
     376         2407 :                         tenant_shard_id.tenant_id.to_string(),
     377         2407 :                         format!("{}", tenant_shard_id.shard_slug()),
     378         2407 :                     ),
     379              :                     Err(_) => {
     380              :                         // Malformed path: this ID is just for observability, so tolerate it
     381              :                         // and pass through
     382            0 :                         (tenant_shard_part.to_string(), "*".to_string())
     383              :                     }
     384              :                 };
     385         2407 :                 (tenant_id, shard_id, parts[parts.len() - 2].to_string())
     386              :             } else {
     387         1200 :                 ("*".to_string(), "*".to_string(), "*".to_string())
     388              :             };
     389         3607 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
     390              : 
     391              :         // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
     392              :         // where our caller doesn't get to use the returned VirtualFile before its
     393              :         // slot gets re-used by someone else.
     394         3607 :         let file = observe_duration!(StorageIoOperation::Open, {
     395         3607 :             open_options.open(path.as_std_path()).await?
     396              :         });
     397              : 
     398              :         // Strip all options other than read and write.
     399              :         //
     400              :         // It would perhaps be nicer to check just for the read and write flags
     401              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     402              :         // only to set them.
     403         3607 :         let mut reopen_options = open_options.clone();
     404         3607 :         reopen_options.create(false);
     405         3607 :         reopen_options.create_new(false);
     406         3607 :         reopen_options.truncate(false);
     407         3607 : 
     408         3607 :         let vfile = VirtualFile {
     409         3607 :             handle: RwLock::new(handle),
     410         3607 :             pos: 0,
     411         3607 :             path: path.to_path_buf(),
     412         3607 :             open_options: reopen_options,
     413         3607 :             tenant_id,
     414         3607 :             shard_id,
     415         3607 :             timeline_id,
     416         3607 :         };
     417         3607 : 
     418         3607 :         // TODO: Under pressure, it's likely the slot will get re-used and
     419         3607 :         // the underlying file closed before they get around to using it.
     420         3607 :         // => https://github.com/neondatabase/neon/issues/6065
     421         3607 :         slot_guard.file.replace(file);
     422         3607 : 
     423         3607 :         Ok(vfile)
     424         3607 :     }
     425              : 
     426              :     /// Async version of [`::utils::crashsafe::overwrite`].
     427              :     ///
     428              :     /// # NB:
     429              :     ///
     430              :     /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
     431              :     /// it did at an earlier time.
     432              :     /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
     433           28 :     pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
     434           28 :         final_path: Utf8PathBuf,
     435           28 :         tmp_path: Utf8PathBuf,
     436           28 :         content: B,
     437           28 :     ) -> std::io::Result<()> {
     438           28 :         // TODO: use tokio_epoll_uring if configured as `io_engine`.
     439           28 :         // See https://github.com/neondatabase/neon/issues/6663
     440           28 : 
     441           28 :         tokio::task::spawn_blocking(move || {
     442           28 :             let slice_storage;
     443           28 :             let content_len = content.bytes_init();
     444           28 :             let content = if content.bytes_init() > 0 {
     445           28 :                 slice_storage = Some(content.slice(0..content_len));
     446           28 :                 slice_storage.as_deref().expect("just set it to Some()")
     447              :             } else {
     448            0 :                 &[]
     449              :             };
     450           28 :             utils::crashsafe::overwrite(&final_path, &tmp_path, content)
     451           28 :         })
     452           28 :         .await
     453           28 :         .expect("blocking task is never aborted")
     454           28 :     }
     455              : 
     456              :     /// Call File::sync_all() on the underlying File.
     457         1703 :     pub async fn sync_all(&self) -> Result<(), Error> {
     458         1703 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     459         1703 :             let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
     460         1703 :             res
     461              :         })
     462         1703 :     }
     463              : 
     464              :     /// Call File::sync_data() on the underlying File.
     465            0 :     pub async fn sync_data(&self) -> Result<(), Error> {
     466            0 :         with_file!(self, StorageIoOperation::Fsync, |file_guard| {
     467            0 :             let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
     468            0 :             res
     469              :         })
     470            0 :     }
     471              : 
     472          962 :     pub async fn metadata(&self) -> Result<Metadata, Error> {
     473          962 :         with_file!(self, StorageIoOperation::Metadata, |file_guard| {
     474          962 :             let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
     475          962 :             res
     476              :         })
     477          962 :     }
     478              : 
     479              :     /// Helper function internal to `VirtualFile` that looks up the underlying File,
     480              :     /// opens it and evicts some other File if necessary. The passed parameter is
     481              :     /// assumed to be a function available for the physical `File`.
     482              :     ///
     483              :     /// We are doing it via a macro as Rust doesn't support async closures that
     484              :     /// take on parameters with lifetimes.
     485       491966 :     async fn lock_file(&self) -> Result<FileGuard, Error> {
     486       491966 :         let open_files = get_open_files();
     487              : 
     488       187784 :         let mut handle_guard = {
     489              :             // Read the cached slot handle, and see if the slot that it points to still
     490              :             // contains our File.
     491              :             //
     492              :             // We only need to hold the handle lock while we read the current handle. If
     493              :             // another thread closes the file and recycles the slot for a different file,
     494              :             // we will notice that the handle we read is no longer valid and retry.
     495       491966 :             let mut handle = *self.handle.read().await;
     496       591039 :             loop {
     497       591039 :                 // Check if the slot contains our File
     498       591039 :                 {
     499       591039 :                     let slot = &open_files.slots[handle.index];
     500       591039 :                     let slot_guard = slot.inner.read().await;
     501       591039 :                     if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
     502              :                         // Found a cached file descriptor.
     503       304182 :                         slot.recently_used.store(true, Ordering::Relaxed);
     504       304182 :                         return Ok(FileGuard { slot_guard });
     505       286857 :                     }
     506              :                 }
     507              : 
     508              :                 // The slot didn't contain our File. We will have to open it ourselves,
     509              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     510              :                 // that no other thread will try to concurrently open the same file.
     511       286857 :                 let handle_guard = self.handle.write().await;
     512              : 
     513              :                 // If another thread changed the handle while we were not holding the lock,
     514              :                 // then the handle might now be valid again. Loop back to retry.
     515       286857 :                 if *handle_guard != handle {
     516        99073 :                     handle = *handle_guard;
     517        99073 :                     continue;
     518       187784 :                 }
     519       187784 :                 break handle_guard;
     520              :             }
     521              :         };
     522              : 
     523              :         // We need to open the file ourselves. The handle in the VirtualFile is
     524              :         // now locked in write-mode. Find a free slot to put it in.
     525       187784 :         let (handle, mut slot_guard) = open_files.find_victim_slot().await;
     526              : 
     527              :         // Re-open the physical file.
     528              :         // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
     529              :         // case from StorageIoOperation::Open. This helps with identifying thrashing
     530              :         // of the virtual file descriptor cache.
     531       187784 :         let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
     532       187784 :             self.open_options.open(self.path.as_std_path()).await?
     533              :         });
     534              : 
     535              :         // Store the File in the slot and update the handle in the VirtualFile
     536              :         // to point to it.
     537       187784 :         slot_guard.file.replace(file);
     538       187784 : 
     539       187784 :         *handle_guard = handle;
     540       187784 : 
     541       187784 :         return Ok(FileGuard {
     542       187784 :             slot_guard: slot_guard.downgrade(),
     543       187784 :         });
     544       491966 :     }
     545              : 
     546            0 :     pub fn remove(self) {
     547            0 :         let path = self.path.clone();
     548            0 :         drop(self);
     549            0 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     550            0 :     }
     551              : 
     552         2898 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     553         2898 :         match pos {
     554         2888 :             SeekFrom::Start(offset) => {
     555         2888 :                 self.pos = offset;
     556         2888 :             }
     557            4 :             SeekFrom::End(offset) => {
     558            4 :                 self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
     559            4 :                     .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
     560              :             }
     561            6 :             SeekFrom::Current(offset) => {
     562            6 :                 let pos = self.pos as i128 + offset as i128;
     563            6 :                 if pos < 0 {
     564            2 :                     return Err(Error::new(
     565            2 :                         ErrorKind::InvalidInput,
     566            2 :                         "offset would be negative",
     567            2 :                     ));
     568            4 :                 }
     569            4 :                 if pos > u64::MAX as i128 {
     570            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     571            4 :                 }
     572            4 :                 self.pos = pos as u64;
     573              :             }
     574              :         }
     575         2894 :         Ok(self.pos)
     576         2898 :     }
     577              : 
     578       331432 :     pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error>
     579       331432 :     where
     580       331432 :         B: IoBufMut + Send,
     581       331432 :     {
     582       331432 :         let (buf, res) =
     583       669821 :             read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
     584       331432 :         res.map(|()| buf)
     585       331432 :     }
     586              : 
     587        19758 :     pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
     588        19758 :     where
     589        19758 :         B: IoBufMut + Send,
     590        19758 :     {
     591        19758 :         let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
     592        19758 :             self.read_at(buf, offset)
     593        19758 :         })
     594        10034 :         .await;
     595        19758 :         res.map(|()| buf)
     596        19758 :     }
     597              : 
     598              :     /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
     599       111230 :     pub async fn read_exact_at_page(
     600       111230 :         &self,
     601       111230 :         page: PageWriteGuard<'static>,
     602       111230 :         offset: u64,
     603       111230 :     ) -> Result<PageWriteGuard<'static>, Error> {
     604       111230 :         let buf = PageWriteGuardBuf {
     605       111230 :             page,
     606       111230 :             init_up_to: 0,
     607       111230 :         };
     608       111230 :         let res = self.read_exact_at(buf, offset).await;
     609       111230 :         res.map(|PageWriteGuardBuf { page, .. }| page)
     610       111230 :             .map_err(|e| Error::new(ErrorKind::Other, e))
     611       111230 :     }
     612              : 
     613              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     614        44752 :     pub async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     615        44752 :         &self,
     616        44752 :         buf: B,
     617        44752 :         mut offset: u64,
     618        44752 :     ) -> (B::Buf, Result<(), Error>) {
     619        44752 :         let buf_len = buf.bytes_init();
     620        44752 :         if buf_len == 0 {
     621            0 :             return (Slice::into_inner(buf.slice_full()), Ok(()));
     622        44752 :         }
     623        44752 :         let mut buf = buf.slice(0..buf_len);
     624        89504 :         while !buf.is_empty() {
     625              :             let res;
     626        44752 :             (buf, res) = self.write_at(buf, offset).await;
     627            0 :             match res {
     628              :                 Ok(0) => {
     629            0 :                     return (
     630            0 :                         Slice::into_inner(buf),
     631            0 :                         Err(Error::new(
     632            0 :                             std::io::ErrorKind::WriteZero,
     633            0 :                             "failed to write whole buffer",
     634            0 :                         )),
     635            0 :                     );
     636              :                 }
     637        44752 :                 Ok(n) => {
     638        44752 :                     buf = buf.slice(n..);
     639        44752 :                     offset += n as u64;
     640        44752 :                 }
     641            0 :                 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     642            0 :                 Err(e) => return (Slice::into_inner(buf), Err(e)),
     643              :             }
     644              :         }
     645        44752 :         (Slice::into_inner(buf), Ok(()))
     646        44752 :     }
     647              : 
     648              :     /// Writes `buf.slice(0..buf.bytes_init())`.
     649              :     /// Returns the IoBuf that is underlying the BoundedBuf `buf`.
     650              :     /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
     651              :     /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
     652        92943 :     pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
     653        92943 :         &mut self,
     654        92943 :         buf: B,
     655        92943 :     ) -> (B::Buf, Result<usize, Error>) {
     656        92943 :         let nbytes = buf.bytes_init();
     657        92943 :         if nbytes == 0 {
     658           32 :             return (Slice::into_inner(buf.slice_full()), Ok(0));
     659        92911 :         }
     660        92911 :         let mut buf = buf.slice(0..nbytes);
     661       185820 :         while !buf.is_empty() {
     662              :             let res;
     663        92911 :             (buf, res) = self.write(buf).await;
     664            2 :             match res {
     665              :                 Ok(0) => {
     666            0 :                     return (
     667            0 :                         Slice::into_inner(buf),
     668            0 :                         Err(Error::new(
     669            0 :                             std::io::ErrorKind::WriteZero,
     670            0 :                             "failed to write whole buffer",
     671            0 :                         )),
     672            0 :                     );
     673              :                 }
     674        92909 :                 Ok(n) => {
     675        92909 :                     buf = buf.slice(n..);
     676        92909 :                 }
     677            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     678            2 :                 Err(e) => return (Slice::into_inner(buf), Err(e)),
     679              :             }
     680              :         }
     681        92909 :         (Slice::into_inner(buf), Ok(nbytes))
     682        92943 :     }
     683              : 
     684        92911 :     async fn write<B: IoBuf + Send>(
     685        92911 :         &mut self,
     686        92911 :         buf: Slice<B>,
     687        92911 :     ) -> (Slice<B>, Result<usize, std::io::Error>) {
     688        92911 :         let pos = self.pos;
     689        92911 :         let (buf, res) = self.write_at(buf, pos).await;
     690        92911 :         let n = match res {
     691        92909 :             Ok(n) => n,
     692            2 :             Err(e) => return (buf, Err(e)),
     693              :         };
     694        92909 :         self.pos += n as u64;
     695        92909 :         (buf, Ok(n))
     696        92911 :     }
     697              : 
     698       351634 :     pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
     699       351634 :     where
     700       351634 :         B: tokio_epoll_uring::BoundedBufMut + Send,
     701       351634 :     {
     702       504194 :         let file_guard = match self.lock_file().await {
     703       351634 :             Ok(file_guard) => file_guard,
     704            0 :             Err(e) => return (buf, Err(e)),
     705              :         };
     706              : 
     707       351634 :         observe_duration!(StorageIoOperation::Read, {
     708       351634 :             let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
     709       351634 :             if let Ok(size) = res {
     710       351632 :                 STORAGE_IO_SIZE
     711       351632 :                     .with_label_values(&[
     712       351632 :                         "read",
     713       351632 :                         &self.tenant_id,
     714       351632 :                         &self.shard_id,
     715       351632 :                         &self.timeline_id,
     716       351632 :                     ])
     717       351632 :                     .add(size as i64);
     718       351632 :             }
     719       351634 :             (buf, res)
     720              :         })
     721       351634 :     }
     722              : 
     723       137663 :     async fn write_at<B: IoBuf + Send>(
     724       137663 :         &self,
     725       137663 :         buf: Slice<B>,
     726       137663 :         offset: u64,
     727       137663 :     ) -> (Slice<B>, Result<usize, Error>) {
     728       137663 :         let file_guard = match self.lock_file().await {
     729       137663 :             Ok(file_guard) => file_guard,
     730            0 :             Err(e) => return (buf, Err(e)),
     731              :         };
     732       137663 :         observe_duration!(StorageIoOperation::Write, {
     733       137663 :             let ((_file_guard, buf), result) =
     734       137663 :                 io_engine::get().write_at(file_guard, offset, buf).await;
     735       137663 :             if let Ok(size) = result {
     736       137661 :                 STORAGE_IO_SIZE
     737       137661 :                     .with_label_values(&[
     738       137661 :                         "write",
     739       137661 :                         &self.tenant_id,
     740       137661 :                         &self.shard_id,
     741       137661 :                         &self.timeline_id,
     742       137661 :                     ])
     743       137661 :                     .add(size as i64);
     744       137661 :             }
     745       137663 :             (buf, result)
     746              :         })
     747       137663 :     }
     748              : }
     749              : 
     750              : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     751       351200 : pub async fn read_exact_at_impl<B, F, Fut>(
     752       351200 :     buf: B,
     753       351200 :     mut offset: u64,
     754       351200 :     count: Option<usize>,
     755       351200 :     mut read_at: F,
     756       351200 : ) -> (B, std::io::Result<()>)
     757       351200 : where
     758       351200 :     B: IoBufMut + Send,
     759       351200 :     F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
     760       351200 :     Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
     761       351200 : {
     762       351200 :     let mut buf: tokio_epoll_uring::Slice<B> = match count {
     763        19760 :         Some(count) => {
     764        19760 :             assert!(count <= buf.bytes_total());
     765        19760 :             assert!(count > 0);
     766        19760 :             buf.slice(..count) // may include uninitialized memory
     767              :         }
     768       331440 :         None => buf.slice_full(), // includes all the uninitialized memory
     769              :     };
     770              : 
     771       702402 :     while buf.bytes_total() != 0 {
     772              :         let res;
     773       679855 :         (buf, res) = read_at(buf, offset).await;
     774            0 :         match res {
     775            2 :             Ok(0) => break,
     776       351202 :             Ok(n) => {
     777       351202 :                 buf = buf.slice(n..);
     778       351202 :                 offset += n as u64;
     779       351202 :             }
     780            0 :             Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     781            0 :             Err(e) => return (buf.into_inner(), Err(e)),
     782              :         }
     783              :     }
     784              :     // NB: don't use `buf.is_empty()` here; it is from the
     785              :     // `impl Deref for Slice { Target = [u8] }`; the &[u8]
     786              :     // returned by it only covers the initialized portion of `buf`.
     787              :     // Whereas we're interested in ensuring that we filled the entire
     788              :     // buffer that the user passed in.
     789       351200 :     if buf.bytes_total() != 0 {
     790            2 :         (
     791            2 :             buf.into_inner(),
     792            2 :             Err(std::io::Error::new(
     793            2 :                 std::io::ErrorKind::UnexpectedEof,
     794            2 :                 "failed to fill whole buffer",
     795            2 :             )),
     796            2 :         )
     797              :     } else {
     798       351198 :         assert_eq!(buf.len(), buf.bytes_total());
     799       351198 :         (buf.into_inner(), Ok(()))
     800              :     }
     801       351200 : }
     802              : 
     803              : #[cfg(test)]
     804              : mod test_read_exact_at_impl {
     805              : 
     806              :     use std::{collections::VecDeque, sync::Arc};
     807              : 
     808              :     use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
     809              : 
     810              :     use super::read_exact_at_impl;
     811              : 
     812              :     struct Expectation {
     813              :         offset: u64,
     814              :         bytes_total: usize,
     815              :         result: std::io::Result<Vec<u8>>,
     816              :     }
     817              :     struct MockReadAt {
     818              :         expectations: VecDeque<Expectation>,
     819              :     }
     820              : 
     821              :     impl MockReadAt {
     822           14 :         async fn read_at(
     823           14 :             &mut self,
     824           14 :             mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
     825           14 :             offset: u64,
     826           14 :         ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
     827           14 :             let exp = self
     828           14 :                 .expectations
     829           14 :                 .pop_front()
     830           14 :                 .expect("read_at called but we have no expectations left");
     831           14 :             assert_eq!(exp.offset, offset);
     832           14 :             assert_eq!(exp.bytes_total, buf.bytes_total());
     833           14 :             match exp.result {
     834           14 :                 Ok(bytes) => {
     835           14 :                     assert!(bytes.len() <= buf.bytes_total());
     836           14 :                     buf.put_slice(&bytes);
     837           14 :                     (buf, Ok(bytes.len()))
     838              :                 }
     839            0 :                 Err(e) => (buf, Err(e)),
     840              :             }
     841           14 :         }
     842              :     }
     843              : 
     844              :     impl Drop for MockReadAt {
     845           10 :         fn drop(&mut self) {
     846           10 :             assert_eq!(self.expectations.len(), 0);
     847           10 :         }
     848              :     }
     849              : 
     850              :     #[tokio::test]
     851            2 :     async fn test_basic() {
     852            2 :         let buf = Vec::with_capacity(5);
     853            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     854            2 :             expectations: VecDeque::from(vec![Expectation {
     855            2 :                 offset: 0,
     856            2 :                 bytes_total: 5,
     857            2 :                 result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
     858            2 :             }]),
     859            2 :         }));
     860            2 :         let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     861            2 :             let mock_read_at = Arc::clone(&mock_read_at);
     862            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     863            2 :         })
     864            2 :         .await;
     865            2 :         assert!(res.is_ok());
     866            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
     867            2 :     }
     868              : 
     869              :     #[tokio::test]
     870            2 :     async fn test_with_count() {
     871            2 :         let buf = Vec::with_capacity(5);
     872            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     873            2 :             expectations: VecDeque::from(vec![Expectation {
     874            2 :                 offset: 0,
     875            2 :                 bytes_total: 3,
     876            2 :                 result: Ok(vec![b'a', b'b', b'c']),
     877            2 :             }]),
     878            2 :         }));
     879            2 : 
     880            2 :         let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
     881            2 :             let mock_read_at = Arc::clone(&mock_read_at);
     882            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     883            2 :         })
     884            2 :         .await;
     885            2 :         assert!(res.is_ok());
     886            2 :         assert_eq!(buf, vec![b'a', b'b', b'c']);
     887            2 :     }
     888              : 
     889              :     #[tokio::test]
     890            2 :     async fn test_empty_buf_issues_no_syscall() {
     891            2 :         let buf = Vec::new();
     892            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     893            2 :             expectations: VecDeque::new(),
     894            2 :         }));
     895            2 :         let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     896            0 :             let mock_read_at = Arc::clone(&mock_read_at);
     897            2 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     898            2 :         })
     899            2 :         .await;
     900            2 :         assert!(res.is_ok());
     901            2 :     }
     902              : 
     903              :     #[tokio::test]
     904            2 :     async fn test_two_read_at_calls_needed_until_buf_filled() {
     905            2 :         let buf = Vec::with_capacity(4);
     906            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     907            2 :             expectations: VecDeque::from(vec![
     908            2 :                 Expectation {
     909            2 :                     offset: 0,
     910            2 :                     bytes_total: 4,
     911            2 :                     result: Ok(vec![b'a', b'b']),
     912            2 :                 },
     913            2 :                 Expectation {
     914            2 :                     offset: 2,
     915            2 :                     bytes_total: 2,
     916            2 :                     result: Ok(vec![b'c', b'd']),
     917            2 :                 },
     918            2 :             ]),
     919            2 :         }));
     920            4 :         let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     921            4 :             let mock_read_at = Arc::clone(&mock_read_at);
     922            4 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     923            4 :         })
     924            2 :         .await;
     925            2 :         assert!(res.is_ok());
     926            2 :         assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
     927            2 :     }
     928              : 
     929              :     #[tokio::test]
     930            2 :     async fn test_eof_before_buffer_full() {
     931            2 :         let buf = Vec::with_capacity(3);
     932            2 :         let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
     933            2 :             expectations: VecDeque::from(vec![
     934            2 :                 Expectation {
     935            2 :                     offset: 0,
     936            2 :                     bytes_total: 3,
     937            2 :                     result: Ok(vec![b'a']),
     938            2 :                 },
     939            2 :                 Expectation {
     940            2 :                     offset: 1,
     941            2 :                     bytes_total: 2,
     942            2 :                     result: Ok(vec![b'b']),
     943            2 :                 },
     944            2 :                 Expectation {
     945            2 :                     offset: 2,
     946            2 :                     bytes_total: 1,
     947            2 :                     result: Ok(vec![]),
     948            2 :                 },
     949            2 :             ]),
     950            2 :         }));
     951            6 :         let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
     952            6 :             let mock_read_at = Arc::clone(&mock_read_at);
     953            6 :             async move { mock_read_at.lock().await.read_at(buf, offset).await }
     954            6 :         })
     955            2 :         .await;
     956            2 :         let Err(err) = res else {
     957            2 :             panic!("should return an error");
     958            2 :         };
     959            2 :         assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
     960            2 :         assert_eq!(format!("{err}"), "failed to fill whole buffer");
     961            2 :         // buffer contents on error are unspecified
     962            2 :     }
     963              : }
     964              : 
     965              : struct FileGuard {
     966              :     slot_guard: RwLockReadGuard<'static, SlotInner>,
     967              : }
     968              : 
     969              : impl AsRef<OwnedFd> for FileGuard {
     970       491966 :     fn as_ref(&self) -> &OwnedFd {
     971       491966 :         // This unwrap is safe because we only create `FileGuard`s
     972       491966 :         // if we know that the file is Some.
     973       491966 :         self.slot_guard.file.as_ref().unwrap()
     974       491966 :     }
     975              : }
     976              : 
     977              : impl FileGuard {
     978              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     979       245952 :     fn with_std_file<F, R>(&self, with: F) -> R
     980       245952 :     where
     981       245952 :         F: FnOnce(&File) -> R,
     982       245952 :     {
     983       245952 :         // SAFETY:
     984       245952 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
     985       245952 :         // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
     986       245952 :         let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
     987       245952 :         let res = with(&file);
     988       245952 :         let _ = file.into_raw_fd();
     989       245952 :         res
     990       245952 :     }
     991              :     /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
     992            4 :     fn with_std_file_mut<F, R>(&mut self, with: F) -> R
     993            4 :     where
     994            4 :         F: FnOnce(&mut File) -> R,
     995            4 :     {
     996            4 :         // SAFETY:
     997            4 :         // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
     998            4 :         // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
     999            4 :         let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
    1000            4 :         let res = with(&mut file);
    1001            4 :         let _ = file.into_raw_fd();
    1002            4 :         res
    1003            4 :     }
    1004              : }
    1005              : 
    1006              : impl tokio_epoll_uring::IoFd for FileGuard {
    1007       246010 :     unsafe fn as_fd(&self) -> RawFd {
    1008       246010 :         let owned_fd: &OwnedFd = self.as_ref();
    1009       246010 :         owned_fd.as_raw_fd()
    1010       246010 :     }
    1011              : }
    1012              : 
    1013              : #[cfg(test)]
    1014              : impl VirtualFile {
    1015        20200 :     pub(crate) async fn read_blk(
    1016        20200 :         &self,
    1017        20200 :         blknum: u32,
    1018        20200 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
    1019        20200 :         use crate::page_cache::PAGE_SZ;
    1020        20200 :         let buf = vec![0; PAGE_SZ];
    1021        20200 :         let buf = self
    1022        20200 :             .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
    1023        10256 :             .await?;
    1024        20200 :         Ok(crate::tenant::block_io::BlockLease::Vec(buf))
    1025        20200 :     }
    1026              : 
    1027          224 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
    1028          224 :         let mut tmp = vec![0; 128];
    1029              :         loop {
    1030              :             let res;
    1031          444 :             (tmp, res) = self.read_at(tmp, self.pos).await;
    1032            2 :             match res {
    1033          222 :                 Ok(0) => return Ok(()),
    1034          220 :                 Ok(n) => {
    1035          220 :                     self.pos += n as u64;
    1036          220 :                     buf.extend_from_slice(&tmp[..n]);
    1037          220 :                 }
    1038            2 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
    1039            2 :                 Err(e) => return Err(e),
    1040              :             }
    1041              :         }
    1042          224 :     }
    1043              : }
    1044              : 
    1045              : impl Drop for VirtualFile {
    1046              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
    1047         3317 :     fn drop(&mut self) {
    1048         3317 :         let handle = self.handle.get_mut();
    1049         3317 : 
    1050         3317 :         fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
    1051         3317 :             if slot_guard.tag == tag {
    1052         3317 :                 slot.recently_used.store(false, Ordering::Relaxed);
    1053         3317 :                 // there is also operation "close-by-replace" for closes done on eviction for
    1054         3317 :                 // comparison.
    1055         3317 :                 if let Some(fd) = slot_guard.file.take() {
    1056         2814 :                     STORAGE_IO_TIME_METRIC
    1057         2814 :                         .get(StorageIoOperation::Close)
    1058         2814 :                         .observe_closure_duration(|| drop(fd));
    1059         2814 :                 }
    1060         3317 :             }
    1061         3317 :         }
    1062         3317 : 
    1063         3317 :         // We don't have async drop so we cannot directly await the lock here.
    1064         3317 :         // Instead, first do a best-effort attempt at closing the underlying
    1065         3317 :         // file descriptor by using `try_write`, and if that fails, spawn
    1066         3317 :         // a tokio task to do it asynchronously: we just want it to be
    1067         3317 :         // cleaned up eventually.
    1068         3317 :         // Most of the time, the `try_lock` should succeed though,
    1069         3317 :         // as we have `&mut self` access. In other words, if the slot
    1070         3317 :         // is still occupied by our file, there should be no access from
    1071         3317 :         // other I/O operations; the only other possible place to lock
    1072         3317 :         // the slot is the lock algorithm looking for free slots.
    1073         3317 :         let slot = &get_open_files().slots[handle.index];
    1074         3317 :         if let Ok(slot_guard) = slot.inner.try_write() {
    1075         3317 :             clean_slot(slot, slot_guard, handle.tag);
    1076         3317 :         } else {
    1077            0 :             let tag = handle.tag;
    1078            0 :             tokio::spawn(async move {
    1079            0 :                 let slot_guard = slot.inner.write().await;
    1080            0 :                 clean_slot(slot, slot_guard, tag);
    1081            0 :             });
    1082            0 :         };
    1083         3317 :     }
    1084              : }
    1085              : 
    1086              : impl OpenFiles {
    1087          120 :     fn new(num_slots: usize) -> OpenFiles {
    1088          120 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
    1089         1200 :         for _ in 0..num_slots {
    1090         1200 :             let slot = Slot {
    1091         1200 :                 recently_used: AtomicBool::new(false),
    1092         1200 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
    1093         1200 :             };
    1094         1200 :             slots.push(slot);
    1095         1200 :         }
    1096              : 
    1097          120 :         OpenFiles {
    1098          120 :             next: AtomicUsize::new(0),
    1099          120 :             slots: Box::leak(slots),
    1100          120 :         }
    1101          120 :     }
    1102              : }
    1103              : 
    1104              : ///
    1105              : /// Initialize the virtual file module. This must be called once at page
    1106              : /// server startup.
    1107              : ///
    1108              : #[cfg(not(test))]
    1109            0 : pub fn init(num_slots: usize, engine: IoEngineKind) {
    1110            0 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
    1111            0 :         panic!("virtual_file::init called twice");
    1112            0 :     }
    1113            0 :     io_engine::init(engine);
    1114            0 :     crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
    1115            0 : }
    1116              : 
    1117              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
    1118              : 
    1119              : // Get a handle to the global slots array.
    1120       498890 : fn get_open_files() -> &'static OpenFiles {
    1121       498890 :     //
    1122       498890 :     // In unit tests, page server startup doesn't happen and no one calls
    1123       498890 :     // virtual_file::init(). Initialize it here, with a small array.
    1124       498890 :     //
    1125       498890 :     // This applies to the virtual file tests below, but all other unit
    1126       498890 :     // tests too, so the virtual file facility is always usable in
    1127       498890 :     // unit tests.
    1128       498890 :     //
    1129       498890 :     if cfg!(test) {
    1130       498890 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
    1131              :     } else {
    1132            0 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
    1133              :     }
    1134       498890 : }
    1135              : 
    1136              : #[cfg(test)]
    1137              : mod tests {
    1138              :     use super::*;
    1139              :     use rand::seq::SliceRandom;
    1140              :     use rand::thread_rng;
    1141              :     use rand::Rng;
    1142              :     use std::future::Future;
    1143              :     use std::io::Write;
    1144              :     use std::os::unix::fs::FileExt;
    1145              :     use std::sync::Arc;
    1146              : 
    1147              :     enum MaybeVirtualFile {
    1148              :         VirtualFile(VirtualFile),
    1149              :         File(File),
    1150              :     }
    1151              : 
    1152              :     impl From<VirtualFile> for MaybeVirtualFile {
    1153            6 :         fn from(vf: VirtualFile) -> Self {
    1154            6 :             MaybeVirtualFile::VirtualFile(vf)
    1155            6 :         }
    1156              :     }
    1157              : 
    1158              :     impl MaybeVirtualFile {
    1159          404 :         async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
    1160          404 :             match self {
    1161          203 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
    1162          202 :                 MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
    1163              :             }
    1164          404 :         }
    1165            8 :         async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1166            8 :             &self,
    1167            8 :             buf: B,
    1168            8 :             offset: u64,
    1169            8 :         ) -> Result<(), Error> {
    1170            8 :             match self {
    1171            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1172            4 :                     let (_buf, res) = file.write_all_at(buf, offset).await;
    1173            4 :                     res
    1174              :                 }
    1175            4 :                 MaybeVirtualFile::File(file) => {
    1176            4 :                     let buf_len = buf.bytes_init();
    1177            4 :                     if buf_len == 0 {
    1178            0 :                         return Ok(());
    1179            4 :                     }
    1180            4 :                     file.write_all_at(&buf.slice(0..buf_len), offset)
    1181              :                 }
    1182              :             }
    1183            8 :         }
    1184           36 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
    1185           36 :             match self {
    1186           18 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
    1187           18 :                 MaybeVirtualFile::File(file) => file.seek(pos),
    1188              :             }
    1189           36 :         }
    1190            8 :         async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
    1191            8 :             &mut self,
    1192            8 :             buf: B,
    1193            8 :         ) -> Result<(), Error> {
    1194            8 :             match self {
    1195            4 :                 MaybeVirtualFile::VirtualFile(file) => {
    1196            4 :                     let (_buf, res) = file.write_all(buf).await;
    1197            4 :                     res.map(|_| ())
    1198              :                 }
    1199            4 :                 MaybeVirtualFile::File(file) => {
    1200            4 :                     let buf_len = buf.bytes_init();
    1201            4 :                     if buf_len == 0 {
    1202            0 :                         return Ok(());
    1203            4 :                     }
    1204            4 :                     file.write_all(&buf.slice(0..buf_len))
    1205              :                 }
    1206              :             }
    1207            8 :         }
    1208              : 
    1209              :         // Helper function to slurp contents of a file, starting at the current position,
    1210              :         // into a string
    1211          442 :         async fn read_string(&mut self) -> Result<String, Error> {
    1212          442 :             use std::io::Read;
    1213          442 :             let mut buf = String::new();
    1214          442 :             match self {
    1215          224 :                 MaybeVirtualFile::VirtualFile(file) => {
    1216          224 :                     let mut buf = Vec::new();
    1217          226 :                     file.read_to_end(&mut buf).await?;
    1218          222 :                     return Ok(String::from_utf8(buf).unwrap());
    1219              :                 }
    1220          218 :                 MaybeVirtualFile::File(file) => {
    1221          218 :                     file.read_to_string(&mut buf)?;
    1222              :                 }
    1223              :             }
    1224          216 :             Ok(buf)
    1225          442 :         }
    1226              : 
    1227              :         // Helper function to slurp a portion of a file into a string
    1228          404 :         async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
    1229          404 :             let buf = vec![0; len];
    1230          404 :             let buf = self.read_exact_at(buf, pos).await?;
    1231          404 :             Ok(String::from_utf8(buf).unwrap())
    1232          404 :         }
    1233              :     }
    1234              : 
    1235              :     #[tokio::test]
    1236            2 :     async fn test_virtual_files() -> anyhow::Result<()> {
    1237            2 :         // The real work is done in the test_files() helper function. This
    1238            2 :         // allows us to run the same set of tests against a native File, and
    1239            2 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
    1240            2 :         // but this allows us to verify that the operations return the same
    1241            2 :         // results with VirtualFiles as with native Files. (Except that with
    1242            2 :         // native files, you will run out of file descriptors if the ulimit
    1243            2 :         // is low enough.)
    1244          206 :         test_files("virtual_files", |path, open_options| async move {
    1245          206 :             let vf = VirtualFile::open_with_options(&path, &open_options).await?;
    1246          206 :             Ok(MaybeVirtualFile::VirtualFile(vf))
    1247          412 :         })
    1248          531 :         .await
    1249            2 :     }
    1250              : 
    1251              :     #[tokio::test]
    1252            2 :     async fn test_physical_files() -> anyhow::Result<()> {
    1253          206 :         test_files("physical_files", |path, open_options| async move {
    1254          206 :             Ok(MaybeVirtualFile::File({
    1255          206 :                 let owned_fd = open_options.open(path.as_std_path()).await?;
    1256          206 :                 File::from(owned_fd)
    1257            2 :             }))
    1258          412 :         })
    1259          104 :         .await
    1260            2 :     }
    1261              : 
    1262            4 :     async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
    1263            4 :     where
    1264            4 :         OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
    1265            4 :         FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
    1266            4 :     {
    1267            4 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
    1268            4 :         std::fs::create_dir_all(&testdir)?;
    1269              : 
    1270            4 :         let path_a = testdir.join("file_a");
    1271            4 :         let mut file_a = openfunc(
    1272            4 :             path_a.clone(),
    1273            4 :             OpenOptions::new()
    1274            4 :                 .write(true)
    1275            4 :                 .create(true)
    1276            4 :                 .truncate(true)
    1277            4 :                 .to_owned(),
    1278            4 :         )
    1279            4 :         .await?;
    1280            4 :         file_a.write_all(b"foobar".to_vec()).await?;
    1281              : 
    1282              :         // cannot read from a file opened in write-only mode
    1283            4 :         let _ = file_a.read_string().await.unwrap_err();
    1284              : 
    1285              :         // Close the file and re-open for reading
    1286            4 :         let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
    1287              : 
    1288              :         // cannot write to a file opened in read-only mode
    1289            4 :         let _ = file_a.write_all(b"bar".to_vec()).await.unwrap_err();
    1290            4 : 
    1291            4 :         // Try simple read
    1292            4 :         assert_eq!("foobar", file_a.read_string().await?);
    1293              : 
    1294              :         // It's positioned at the EOF now.
    1295            4 :         assert_eq!("", file_a.read_string().await?);
    1296              : 
    1297              :         // Test seeks.
    1298            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1299            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1300              : 
    1301            4 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
    1302            4 :         assert_eq!("ar", file_a.read_string().await?);
    1303              : 
    1304            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1305            4 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
    1306            4 :         assert_eq!("bar", file_a.read_string().await?);
    1307              : 
    1308            4 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
    1309            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1310              : 
    1311              :         // Test erroneous seeks to before byte 0
    1312            4 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
    1313            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1314            4 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
    1315            4 : 
    1316            4 :         // the erroneous seek should have left the position unchanged
    1317            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1318              : 
    1319              :         // Create another test file, and try FileExt functions on it.
    1320            4 :         let path_b = testdir.join("file_b");
    1321            4 :         let mut file_b = openfunc(
    1322            4 :             path_b.clone(),
    1323            4 :             OpenOptions::new()
    1324            4 :                 .read(true)
    1325            4 :                 .write(true)
    1326            4 :                 .create(true)
    1327            4 :                 .truncate(true)
    1328            4 :                 .to_owned(),
    1329            4 :         )
    1330            2 :         .await?;
    1331            4 :         file_b.write_all_at(b"BAR".to_vec(), 3).await?;
    1332            4 :         file_b.write_all_at(b"FOO".to_vec(), 0).await?;
    1333              : 
    1334            4 :         assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
    1335              : 
    1336              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
    1337              :         // open the same file many times. The effect is the same.)
    1338              :         //
    1339              :         // leave file_a positioned at offset 1 before we start
    1340            4 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
    1341              : 
    1342            4 :         let mut vfiles = Vec::new();
    1343          404 :         for _ in 0..100 {
    1344          400 :             let mut vfile =
    1345          400 :                 openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
    1346          400 :             assert_eq!("FOOBAR", vfile.read_string().await?);
    1347          400 :             vfiles.push(vfile);
    1348              :         }
    1349              : 
    1350              :         // make sure we opened enough files to definitely cause evictions.
    1351            4 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
    1352              : 
    1353              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
    1354              :         // from it again. We left the file positioned at offset 1 above.
    1355            4 :         assert_eq!("oobar", file_a.read_string().await?);
    1356              : 
    1357              :         // Check that all the other FDs still work too. Use them in random order for
    1358              :         // good measure.
    1359            4 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
    1360          400 :         for vfile in vfiles.iter_mut() {
    1361          400 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
    1362              :         }
    1363              : 
    1364            4 :         Ok(())
    1365            4 :     }
    1366              : 
    1367              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
    1368              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
    1369              :     /// VirtualFile from multiple threads concurrently.
    1370              :     #[tokio::test]
    1371            2 :     async fn test_vfile_concurrency() -> Result<(), Error> {
    1372            2 :         const SIZE: usize = 8 * 1024;
    1373            2 :         const VIRTUAL_FILES: usize = 100;
    1374            2 :         const THREADS: usize = 100;
    1375            2 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
    1376            2 : 
    1377            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
    1378            2 :         std::fs::create_dir_all(&testdir)?;
    1379            2 : 
    1380            2 :         // Create a test file.
    1381            2 :         let test_file_path = testdir.join("concurrency_test_file");
    1382            2 :         {
    1383            2 :             let file = File::create(&test_file_path)?;
    1384            2 :             file.write_all_at(&SAMPLE, 0)?;
    1385            2 :         }
    1386            2 : 
    1387            2 :         // Open the file many times.
    1388            2 :         let mut files = Vec::new();
    1389          202 :         for _ in 0..VIRTUAL_FILES {
    1390          200 :             let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
    1391          101 :                 .await?;
    1392          200 :             files.push(f);
    1393            2 :         }
    1394            2 :         let files = Arc::new(files);
    1395            2 : 
    1396            2 :         // Launch many threads, and use the virtual files concurrently in random order.
    1397            2 :         let rt = tokio::runtime::Builder::new_multi_thread()
    1398            2 :             .worker_threads(THREADS)
    1399            2 :             .thread_name("test_vfile_concurrency thread")
    1400            2 :             .build()
    1401            2 :             .unwrap();
    1402            2 :         let mut hdls = Vec::new();
    1403          202 :         for _threadno in 0..THREADS {
    1404          200 :             let files = files.clone();
    1405          200 :             let hdl = rt.spawn(async move {
    1406          200 :                 let mut buf = vec![0u8; SIZE];
    1407          200 :                 let mut rng = rand::rngs::OsRng;
    1408       200000 :                 for _ in 1..1000 {
    1409       199800 :                     let f = &files[rng.gen_range(0..files.len())];
    1410       597035 :                     buf = f.read_exact_at(buf, 0).await.unwrap();
    1411       199800 :                     assert!(buf == SAMPLE);
    1412            2 :                 }
    1413          200 :             });
    1414          200 :             hdls.push(hdl);
    1415          200 :         }
    1416          202 :         for hdl in hdls {
    1417          200 :             hdl.await?;
    1418            2 :         }
    1419            2 :         std::mem::forget(rt);
    1420            2 : 
    1421            2 :         Ok(())
    1422            2 :     }
    1423              : 
    1424              :     #[tokio::test]
    1425            2 :     async fn test_atomic_overwrite_basic() {
    1426            2 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
    1427            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1428            2 : 
    1429            2 :         let path = testdir.join("myfile");
    1430            2 :         let tmp_path = testdir.join("myfile.tmp");
    1431            2 : 
    1432            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1433            2 :             .await
    1434            2 :             .unwrap();
    1435            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
    1436            2 :         let post = file.read_string().await.unwrap();
    1437            2 :         assert_eq!(post, "foo");
    1438            2 :         assert!(!tmp_path.exists());
    1439            2 :         drop(file);
    1440            2 : 
    1441            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
    1442            2 :             .await
    1443            2 :             .unwrap();
    1444            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
    1445            2 :         let post = file.read_string().await.unwrap();
    1446            2 :         assert_eq!(post, "bar");
    1447            2 :         assert!(!tmp_path.exists());
    1448            2 :         drop(file);
    1449            2 :     }
    1450              : 
    1451              :     #[tokio::test]
    1452            2 :     async fn test_atomic_overwrite_preexisting_tmp() {
    1453            2 :         let testdir =
    1454            2 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
    1455            2 :         std::fs::create_dir_all(&testdir).unwrap();
    1456            2 : 
    1457            2 :         let path = testdir.join("myfile");
    1458            2 :         let tmp_path = testdir.join("myfile.tmp");
    1459            2 : 
    1460            2 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
    1461            2 :         assert!(tmp_path.exists());
    1462            2 : 
    1463            2 :         VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
    1464            2 :             .await
    1465            2 :             .unwrap();
    1466            2 : 
    1467            2 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
    1468            2 :         let post = file.read_string().await.unwrap();
    1469            2 :         assert_eq!(post, "foo");
    1470            2 :         assert!(!tmp_path.exists());
    1471            2 :         drop(file);
    1472            2 :     }
    1473              : }
        

Generated by: LCOV version 2.1-beta