LCOV - differential code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 90.3 % 595 537 58 537
Current Date: 2023-10-19 02:04:12 Functions: 90.7 % 108 98 10 2 96 2
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! VirtualFile is like a normal File, but it's not bound directly to
       3                 : //! a file descriptor. Instead, the file is opened when it's read from,
       4                 : //! and if too many files are open globally in the system, least-recently
       5                 : //! used ones are closed.
       6                 : //!
       7                 : //! To track which files have been recently used, we use the clock algorithm
       8                 : //! with a 'recently_used' flag on each slot.
       9                 : //!
      10                 : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11                 : //! src/backend/storage/file/fd.c
      12                 : //!
      13                 : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
      14                 : use crate::tenant::TENANTS_SEGMENT_NAME;
      15                 : use camino::{Utf8Path, Utf8PathBuf};
      16                 : use once_cell::sync::OnceCell;
      17                 : use std::fs::{self, File, OpenOptions};
      18                 : use std::io::{Error, ErrorKind, Seek, SeekFrom};
      19                 : use std::os::unix::fs::FileExt;
      20                 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      21                 : use std::sync::{RwLock, RwLockWriteGuard};
      22                 : 
      23                 : ///
      24                 : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      25                 : /// the underlying file is closed if the system is low on file descriptors,
      26                 : /// and re-opened when it's accessed again.
      27                 : ///
      28                 : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      29                 : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      30                 : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      31                 : /// require a mutable reference, because they modify the "current position".
      32                 : ///
      33                 : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      34                 : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      35                 : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      36                 : /// 'tag' field is used to detect whether the handle still is valid or not.
      37                 : ///
      38 UBC           0 : #[derive(Debug)]
      39                 : pub struct VirtualFile {
      40                 :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      41                 :     /// might contain our File, or it may be empty, or it may contain a File that
      42                 :     /// belongs to a different VirtualFile.
      43                 :     handle: RwLock<SlotHandle>,
      44                 : 
      45                 :     /// Current file position
      46                 :     pos: u64,
      47                 : 
      48                 :     /// File path and options to use to open it.
      49                 :     ///
      50                 :     /// Note: this only contains the options needed to re-open it. For example,
      51                 :     /// if a new file is created, we only pass the create flag when it's initially
      52                 :     /// opened, in the VirtualFile::create() function, and strip the flag before
      53                 :     /// storing it here.
      54                 :     pub path: Utf8PathBuf,
      55                 :     open_options: OpenOptions,
      56                 : 
      57                 :     // These are strings becase we only use them for metrics, and those expect strings.
      58                 :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      59                 :     // strings.
      60                 :     tenant_id: String,
      61                 :     timeline_id: String,
      62                 : }
      63                 : 
      64 CBC      165845 : #[derive(Debug, PartialEq, Clone, Copy)]
      65                 : struct SlotHandle {
      66                 :     /// Index into OPEN_FILES.slots
      67                 :     index: usize,
      68                 : 
      69                 :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
      70                 :     /// been recycled and no longer contains the FD for this virtual file.
      71                 :     tag: u64,
      72                 : }
      73                 : 
      74                 : /// OPEN_FILES is the global array that holds the physical file descriptors that
      75                 : /// are currently open. Each slot in the array is protected by a separate lock,
      76                 : /// so that different files can be accessed independently. The lock must be held
      77                 : /// in write mode to replace the slot with a different file, but a read mode
      78                 : /// is enough to operate on the file, whether you're reading or writing to it.
      79                 : ///
      80                 : /// OPEN_FILES starts in uninitialized state, and it's initialized by
      81                 : /// the virtual_file::init() function. It must be called exactly once at page
      82                 : /// server startup.
      83                 : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
      84                 : 
      85                 : struct OpenFiles {
      86                 :     slots: &'static [Slot],
      87                 : 
      88                 :     /// clock arm for the clock algorithm
      89                 :     next: AtomicUsize,
      90                 : }
      91                 : 
      92                 : struct Slot {
      93                 :     inner: RwLock<SlotInner>,
      94                 : 
      95                 :     /// has this file been used since last clock sweep?
      96                 :     recently_used: AtomicBool,
      97                 : }
      98                 : 
      99                 : struct SlotInner {
     100                 :     /// Counter that's incremented every time a different file is stored here.
     101                 :     /// To avoid the ABA problem.
     102                 :     tag: u64,
     103                 : 
     104                 :     /// the underlying file
     105                 :     file: Option<File>,
     106                 : }
     107                 : 
     108                 : impl OpenFiles {
     109                 :     /// Find a slot to use, evicting an existing file descriptor if needed.
     110                 :     ///
     111                 :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     112                 :     /// recently_used has been set. It's all ready for reuse.
     113          177524 :     fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     114          177524 :         //
     115          177524 :         // Run the clock algorithm to find a slot to replace.
     116          177524 :         //
     117          177524 :         let num_slots = self.slots.len();
     118          177524 :         let mut retries = 0;
     119                 :         let mut slot;
     120                 :         let mut slot_guard;
     121                 :         let index;
     122          401239 :         loop {
     123          401239 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     124          401239 :             slot = &self.slots[next];
     125          401239 : 
     126          401239 :             // If the recently_used flag on this slot is set, continue the clock
     127          401239 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     128          401239 :             // lock, also continue the clock sweep.
     129          401239 :             //
     130          401239 :             // We only continue in this manner for a while, though. If we loop
     131          401239 :             // through the array twice without finding a victim, just pick the
     132          401239 :             // next slot and wait until we can reuse it. This way, we avoid
     133          401239 :             // spinning in the extreme case that all the slots are busy with an
     134          401239 :             // I/O operation.
     135          401239 :             if retries < num_slots * 2 {
     136          400446 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     137          235917 :                     if let Ok(guard) = slot.inner.try_write() {
     138          176731 :                         slot_guard = guard;
     139          176731 :                         index = next;
     140          176731 :                         break;
     141           59186 :                     }
     142          164529 :                 }
     143          223715 :                 retries += 1;
     144                 :             } else {
     145             793 :                 slot_guard = slot.inner.write().unwrap();
     146             793 :                 index = next;
     147             793 :                 break;
     148                 :             }
     149                 :         }
     150                 : 
     151                 :         //
     152                 :         // We now have the victim slot locked. If it was in use previously, close the
     153                 :         // old file.
     154                 :         //
     155          177524 :         if let Some(old_file) = slot_guard.file.take() {
     156          129751 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     157          129751 :             // distinguish the two.
     158          129751 :             STORAGE_IO_TIME_METRIC
     159          129751 :                 .get(StorageIoOperation::CloseByReplace)
     160          129751 :                 .observe_closure_duration(|| drop(old_file));
     161          129751 :         }
     162                 : 
     163                 :         // Prepare the slot for reuse and return it
     164          177524 :         slot_guard.tag += 1;
     165          177524 :         slot.recently_used.store(true, Ordering::Relaxed);
     166          177524 :         (
     167          177524 :             SlotHandle {
     168          177524 :                 index,
     169          177524 :                 tag: slot_guard.tag,
     170          177524 :             },
     171          177524 :             slot_guard,
     172          177524 :         )
     173          177524 :     }
     174                 : }
     175                 : 
     176 UBC           0 : #[derive(Debug, thiserror::Error)]
     177                 : pub enum CrashsafeOverwriteError {
     178                 :     #[error("final path has no parent dir")]
     179                 :     FinalPathHasNoParentDir,
     180                 :     #[error("remove tempfile")]
     181                 :     RemovePreviousTempfile(#[source] std::io::Error),
     182                 :     #[error("create tempfile")]
     183                 :     CreateTempfile(#[source] std::io::Error),
     184                 :     #[error("write tempfile")]
     185                 :     WriteContents(#[source] std::io::Error),
     186                 :     #[error("sync tempfile")]
     187                 :     SyncTempfile(#[source] std::io::Error),
     188                 :     #[error("rename tempfile to final path")]
     189                 :     RenameTempfileToFinalPath(#[source] std::io::Error),
     190                 :     #[error("open final path parent dir")]
     191                 :     OpenFinalPathParentDir(#[source] std::io::Error),
     192                 :     #[error("sync final path parent dir")]
     193                 :     SyncFinalPathParentDir(#[source] std::io::Error),
     194                 : }
     195                 : impl CrashsafeOverwriteError {
     196                 :     /// Returns true iff the new contents are durably stored.
     197               0 :     pub fn are_new_contents_durable(&self) -> bool {
     198               0 :         match self {
     199               0 :             Self::FinalPathHasNoParentDir => false,
     200               0 :             Self::RemovePreviousTempfile(_) => false,
     201               0 :             Self::CreateTempfile(_) => false,
     202               0 :             Self::WriteContents(_) => false,
     203               0 :             Self::SyncTempfile(_) => false,
     204               0 :             Self::RenameTempfileToFinalPath(_) => false,
     205               0 :             Self::OpenFinalPathParentDir(_) => false,
     206               0 :             Self::SyncFinalPathParentDir(_) => true,
     207                 :         }
     208               0 :     }
     209                 : }
     210                 : 
     211                 : impl VirtualFile {
     212                 :     /// Open a file in read-only mode. Like File::open.
     213 CBC       14824 :     pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
     214           14824 :         Self::open_with_options(path, OpenOptions::new().read(true)).await
     215           14824 :     }
     216                 : 
     217                 :     /// Create a new file for writing. If the file exists, it will be truncated.
     218                 :     /// Like File::create.
     219           15740 :     pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
     220           15740 :         Self::open_with_options(
     221           15740 :             path,
     222           15740 :             OpenOptions::new().write(true).create(true).truncate(true),
     223           15740 :         )
     224 UBC           0 :         .await
     225 CBC       15740 :     }
     226                 : 
     227                 :     /// Open a file with given options.
     228                 :     ///
     229                 :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     230                 :     /// they will be applied also when the file is subsequently re-opened, not only
     231                 :     /// on the first time. Make sure that's sane!
     232           54741 :     pub async fn open_with_options(
     233           54741 :         path: &Utf8Path,
     234           54741 :         open_options: &OpenOptions,
     235           54741 :     ) -> Result<VirtualFile, std::io::Error> {
     236           54741 :         let path_str = path.to_string();
     237           54741 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     238           54741 :         let tenant_id;
     239           54741 :         let timeline_id;
     240           54741 :         if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     241           46271 :             tenant_id = parts[parts.len() - 4].to_string();
     242           46271 :             timeline_id = parts[parts.len() - 2].to_string();
     243           46271 :         } else {
     244            8470 :             tenant_id = "*".to_string();
     245            8470 :             timeline_id = "*".to_string();
     246            8470 :         }
     247           54741 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot();
     248                 : 
     249           54741 :         let file = STORAGE_IO_TIME_METRIC
     250           54741 :             .get(StorageIoOperation::Open)
     251           54741 :             .observe_closure_duration(|| open_options.open(path))?;
     252                 : 
     253                 :         // Strip all options other than read and write.
     254                 :         //
     255                 :         // It would perhaps be nicer to check just for the read and write flags
     256                 :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     257                 :         // only to set them.
     258           54741 :         let mut reopen_options = open_options.clone();
     259           54741 :         reopen_options.create(false);
     260           54741 :         reopen_options.create_new(false);
     261           54741 :         reopen_options.truncate(false);
     262           54741 : 
     263           54741 :         let vfile = VirtualFile {
     264           54741 :             handle: RwLock::new(handle),
     265           54741 :             pos: 0,
     266           54741 :             path: path.to_path_buf(),
     267           54741 :             open_options: reopen_options,
     268           54741 :             tenant_id,
     269           54741 :             timeline_id,
     270           54741 :         };
     271           54741 : 
     272           54741 :         slot_guard.file.replace(file);
     273           54741 : 
     274           54741 :         Ok(vfile)
     275           54741 :     }
     276                 : 
     277                 :     /// Writes a file to the specified `final_path` in a crash safe fasion
     278                 :     ///
     279                 :     /// The file is first written to the specified tmp_path, and in a second
     280                 :     /// step, the tmp path is renamed to the final path. As renames are
     281                 :     /// atomic, a crash during the write operation will never leave behind a
     282                 :     /// partially written file.
     283            7444 :     pub async fn crashsafe_overwrite(
     284            7444 :         final_path: &Utf8Path,
     285            7444 :         tmp_path: &Utf8Path,
     286            7444 :         content: &[u8],
     287            7444 :     ) -> Result<(), CrashsafeOverwriteError> {
     288            7444 :         let Some(final_path_parent) = final_path.parent() else {
     289 UBC           0 :             return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
     290                 :         };
     291 CBC        7444 :         match std::fs::remove_file(tmp_path) {
     292               1 :             Ok(()) => {}
     293            7443 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     294 UBC           0 :             Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
     295                 :         }
     296 CBC        7444 :         let mut file = Self::open_with_options(
     297            7444 :             tmp_path,
     298            7444 :             OpenOptions::new()
     299            7444 :                 .write(true)
     300            7444 :                 // Use `create_new` so that, if we race with ourselves or something else,
     301            7444 :                 // we bail out instead of causing damage.
     302            7444 :                 .create_new(true),
     303            7444 :         )
     304 UBC           0 :         .await
     305 CBC        7444 :         .map_err(CrashsafeOverwriteError::CreateTempfile)?;
     306            7444 :         file.write_all(content)
     307 UBC           0 :             .await
     308 CBC        7444 :             .map_err(CrashsafeOverwriteError::WriteContents)?;
     309            7444 :         file.sync_all()
     310 UBC           0 :             .await
     311 CBC        7444 :             .map_err(CrashsafeOverwriteError::SyncTempfile)?;
     312            7444 :         drop(file); // before the rename, that's important!
     313            7444 :                     // renames are atomic
     314            7444 :         std::fs::rename(tmp_path, final_path)
     315            7444 :             .map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
     316                 :         // Only open final path parent dirfd now, so that this operation only
     317                 :         // ever holds one VirtualFile fd at a time.  That's important because
     318                 :         // the current `find_victim_slot` impl might pick the same slot for both
     319                 :         // VirtualFile., and it eventually does a blocking write lock instead of
     320                 :         // try_lock.
     321            7444 :         let final_parent_dirfd =
     322            7444 :             Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
     323 UBC           0 :                 .await
     324 CBC        7444 :                 .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
     325            7444 :         final_parent_dirfd
     326            7444 :             .sync_all()
     327 UBC           0 :             .await
     328 CBC        7444 :             .map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
     329            7444 :         Ok(())
     330            7444 :     }
     331                 : 
     332                 :     /// Call File::sync_all() on the underlying File.
     333           34011 :     pub async fn sync_all(&self) -> Result<(), Error> {
     334           34011 :         self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
     335 UBC           0 :             .await?
     336 CBC       34011 :     }
     337                 : 
     338           19123 :     pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
     339           19123 :         self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
     340 UBC           0 :             .await?
     341 CBC       19123 :     }
     342                 : 
     343                 :     /// Helper function that looks up the underlying File for this VirtualFile,
     344                 :     /// opening it and evicting some other File if necessary. It calls 'func'
     345                 :     /// with the physical File.
     346        15897744 :     async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
     347        15897744 :     where
     348        15897744 :         F: FnMut(&File) -> R,
     349        15897744 :     {
     350        15897744 :         let open_files = get_open_files();
     351                 : 
     352          122783 :         let mut handle_guard = {
     353                 :             // Read the cached slot handle, and see if the slot that it points to still
     354                 :             // contains our File.
     355                 :             //
     356                 :             // We only need to hold the handle lock while we read the current handle. If
     357                 :             // another thread closes the file and recycles the slot for a different file,
     358                 :             // we will notice that the handle we read is no longer valid and retry.
     359        15897744 :             let mut handle = *self.handle.read().unwrap();
     360        15940806 :             loop {
     361        15940806 :                 // Check if the slot contains our File
     362        15940806 :                 {
     363        15940806 :                     let slot = &open_files.slots[handle.index];
     364        15940806 :                     let slot_guard = slot.inner.read().unwrap();
     365        15940806 :                     if slot_guard.tag == handle.tag {
     366        15774961 :                         if let Some(file) = &slot_guard.file {
     367                 :                             // Found a cached file descriptor.
     368        15774961 :                             slot.recently_used.store(true, Ordering::Relaxed);
     369        15774961 :                             return Ok(STORAGE_IO_TIME_METRIC
     370        15774961 :                                 .get(op)
     371        15774961 :                                 .observe_closure_duration(|| func(file)));
     372 UBC           0 :                         }
     373 CBC      165845 :                     }
     374                 :                 }
     375                 : 
     376                 :                 // The slot didn't contain our File. We will have to open it ourselves,
     377                 :                 // but before that, grab a write lock on handle in the VirtualFile, so
     378                 :                 // that no other thread will try to concurrently open the same file.
     379          165845 :                 let handle_guard = self.handle.write().unwrap();
     380          165845 : 
     381          165845 :                 // If another thread changed the handle while we were not holding the lock,
     382          165845 :                 // then the handle might now be valid again. Loop back to retry.
     383          165845 :                 if *handle_guard != handle {
     384           43062 :                     handle = *handle_guard;
     385           43062 :                     continue;
     386          122783 :                 }
     387          122783 :                 break handle_guard;
     388          122783 :             }
     389          122783 :         };
     390          122783 : 
     391          122783 :         // We need to open the file ourselves. The handle in the VirtualFile is
     392          122783 :         // now locked in write-mode. Find a free slot to put it in.
     393          122783 :         let (handle, mut slot_guard) = open_files.find_victim_slot();
     394                 : 
     395                 :         // Open the physical file
     396          122783 :         let file = STORAGE_IO_TIME_METRIC
     397          122783 :             .get(StorageIoOperation::Open)
     398          122783 :             .observe_closure_duration(|| self.open_options.open(&self.path))?;
     399                 : 
     400                 :         // Perform the requested operation on it
     401          122783 :         let result = STORAGE_IO_TIME_METRIC
     402          122783 :             .get(op)
     403          122783 :             .observe_closure_duration(|| func(&file));
     404          122783 : 
     405          122783 :         // Store the File in the slot and update the handle in the VirtualFile
     406          122783 :         // to point to it.
     407          122783 :         slot_guard.file.replace(file);
     408          122783 : 
     409          122783 :         *handle_guard = handle;
     410          122783 : 
     411          122783 :         Ok(result)
     412        15897744 :     }
     413                 : 
     414 UBC           0 :     pub fn remove(self) {
     415               0 :         let path = self.path.clone();
     416               0 :         drop(self);
     417               0 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     418               0 :     }
     419                 : 
     420 CBC       57390 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     421           57389 :         match pos {
     422           57384 :             SeekFrom::Start(offset) => {
     423           57384 :                 self.pos = offset;
     424           57384 :             }
     425               2 :             SeekFrom::End(offset) => {
     426               2 :                 self.pos = self
     427               2 :                     .with_file(StorageIoOperation::Seek, |mut file| {
     428               2 :                         file.seek(SeekFrom::End(offset))
     429               2 :                     })
     430               1 :                     .await??
     431                 :             }
     432               3 :             SeekFrom::Current(offset) => {
     433               3 :                 let pos = self.pos as i128 + offset as i128;
     434               3 :                 if pos < 0 {
     435               1 :                     return Err(Error::new(
     436               1 :                         ErrorKind::InvalidInput,
     437               1 :                         "offset would be negative",
     438               1 :                     ));
     439               2 :                 }
     440               2 :                 if pos > u64::MAX as i128 {
     441 UBC           0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     442 CBC           2 :                 }
     443               2 :                 self.pos = pos as u64;
     444                 :             }
     445                 :         }
     446           57387 :         Ok(self.pos)
     447           57389 :     }
     448                 : 
     449                 :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     450         6726981 :     pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
     451        13453944 :         while !buf.is_empty() {
     452         6726972 :             match self.read_at(buf, offset).await {
     453                 :                 Ok(0) => {
     454 UBC           0 :                     return Err(Error::new(
     455               0 :                         std::io::ErrorKind::UnexpectedEof,
     456               0 :                         "failed to fill whole buffer",
     457               0 :                     ))
     458                 :                 }
     459 CBC     6726972 :                 Ok(n) => {
     460         6726972 :                     buf = &mut buf[n..];
     461         6726972 :                     offset += n as u64;
     462         6726972 :                 }
     463 UBC           0 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     464               0 :                 Err(e) => return Err(e),
     465                 :             }
     466                 :         }
     467 CBC     6726972 :         Ok(())
     468         6726972 :     }
     469                 : 
     470                 :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     471         3660159 :     pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
     472         7320307 :         while !buf.is_empty() {
     473         3660154 :             match self.write_at(buf, offset).await {
     474                 :                 Ok(0) => {
     475 UBC           0 :                     return Err(Error::new(
     476               0 :                         std::io::ErrorKind::WriteZero,
     477               0 :                         "failed to write whole buffer",
     478               0 :                     ));
     479                 :                 }
     480 CBC     3660153 :                 Ok(n) => {
     481         3660153 :                     buf = &buf[n..];
     482         3660153 :                     offset += n as u64;
     483         3660153 :                 }
     484 UBC           0 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     485               0 :                 Err(e) => return Err(e),
     486                 :             }
     487                 :         }
     488 CBC     3660153 :         Ok(())
     489         3660153 :     }
     490                 : 
     491         5457283 :     pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> {
     492        10914526 :         while !buf.is_empty() {
     493         5457260 :             match self.write(buf).await {
     494                 :                 Ok(0) => {
     495 UBC           0 :                     return Err(Error::new(
     496               0 :                         std::io::ErrorKind::WriteZero,
     497               0 :                         "failed to write whole buffer",
     498               0 :                     ));
     499                 :                 }
     500 CBC     5457258 :                 Ok(n) => {
     501         5457258 :                     buf = &buf[n..];
     502         5457258 :                 }
     503               1 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     504               1 :                 Err(e) => return Err(e),
     505                 :             }
     506                 :         }
     507         5457266 :         Ok(())
     508         5457267 :     }
     509                 : 
     510         5457275 :     async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
     511         5457260 :         let pos = self.pos;
     512         5457260 :         let n = self.write_at(buf, pos).await?;
     513         5457258 :         self.pos += n as u64;
     514         5457258 :         Ok(n)
     515         5457259 :     }
     516                 : 
     517         6727203 :     pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
     518         6727194 :         let result = self
     519         6727194 :             .with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
     520 UBC           0 :             .await?;
     521 CBC     6727194 :         if let Ok(size) = result {
     522         6727193 :             STORAGE_IO_SIZE
     523         6727193 :                 .with_label_values(&["read", &self.tenant_id, &self.timeline_id])
     524         6727193 :                 .add(size as i64);
     525         6727193 :         }
     526         6727194 :         result
     527         6727194 :     }
     528                 : 
     529         9117434 :     async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
     530         9117414 :         let result = self
     531         9117414 :             .with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
     532 UBC           0 :             .await?;
     533 CBC     9117412 :         if let Ok(size) = result {
     534         9117411 :             STORAGE_IO_SIZE
     535         9117411 :                 .with_label_values(&["write", &self.tenant_id, &self.timeline_id])
     536         9117411 :                 .add(size as i64);
     537         9117411 :         }
     538         9117412 :         result
     539         9117412 :     }
     540                 : }
     541                 : 
     542                 : #[cfg(test)]
     543                 : impl VirtualFile {
     544           14114 :     pub(crate) async fn read_blk(
     545           14114 :         &self,
     546           14114 :         blknum: u32,
     547           14114 :     ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
     548           14114 :         use crate::page_cache::PAGE_SZ;
     549           14114 :         let mut buf = [0; PAGE_SZ];
     550           14114 :         self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
     551 UBC           0 :             .await?;
     552 CBC       14114 :         Ok(std::sync::Arc::new(buf).into())
     553           14114 :     }
     554                 : 
     555             112 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
     556             222 :         loop {
     557             222 :             let mut tmp = [0; 128];
     558             222 :             match self.read_at(&mut tmp, self.pos).await {
     559             111 :                 Ok(0) => return Ok(()),
     560             110 :                 Ok(n) => {
     561             110 :                     self.pos += n as u64;
     562             110 :                     buf.extend_from_slice(&tmp[..n]);
     563             110 :                 }
     564               1 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     565               1 :                 Err(e) => return Err(e),
     566                 :             }
     567                 :         }
     568             112 :     }
     569                 : }
     570                 : 
     571                 : impl Drop for VirtualFile {
     572                 :     /// If a VirtualFile is dropped, close the underlying file if it was open.
     573           46964 :     fn drop(&mut self) {
     574           46964 :         let handle = self.handle.get_mut().unwrap();
     575           46964 : 
     576           46964 :         // We could check with a read-lock first, to avoid waiting on an
     577           46964 :         // unrelated I/O.
     578           46964 :         let slot = &get_open_files().slots[handle.index];
     579           46964 :         let mut slot_guard = slot.inner.write().unwrap();
     580           46964 :         if slot_guard.tag == handle.tag {
     581           43632 :             slot.recently_used.store(false, Ordering::Relaxed);
     582           43632 :             // there is also operation "close-by-replace" for closes done on eviction for
     583           43632 :             // comparison.
     584           43632 :             STORAGE_IO_TIME_METRIC
     585           43632 :                 .get(StorageIoOperation::Close)
     586           43632 :                 .observe_closure_duration(|| drop(slot_guard.file.take()));
     587           43632 :         }
     588           46964 :     }
     589                 : }
     590                 : 
     591                 : impl OpenFiles {
     592             561 :     fn new(num_slots: usize) -> OpenFiles {
     593             561 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
     594           56010 :         for _ in 0..num_slots {
     595           56010 :             let slot = Slot {
     596           56010 :                 recently_used: AtomicBool::new(false),
     597           56010 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
     598           56010 :             };
     599           56010 :             slots.push(slot);
     600           56010 :         }
     601                 : 
     602             561 :         OpenFiles {
     603             561 :             next: AtomicUsize::new(0),
     604             561 :             slots: Box::leak(slots),
     605             561 :         }
     606             561 :     }
     607                 : }
     608                 : 
     609                 : ///
     610                 : /// Initialize the virtual file module. This must be called once at page
     611                 : /// server startup.
     612                 : ///
     613             560 : pub fn init(num_slots: usize) {
     614             560 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
     615 UBC           0 :         panic!("virtual_file::init called twice");
     616 CBC         560 :     }
     617             560 : }
     618                 : 
     619                 : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
     620                 : 
     621                 : // Get a handle to the global slots array.
     622        15999478 : fn get_open_files() -> &'static OpenFiles {
     623        15999478 :     //
     624        15999478 :     // In unit tests, page server startup doesn't happen and no one calls
     625        15999478 :     // virtual_file::init(). Initialize it here, with a small array.
     626        15999478 :     //
     627        15999478 :     // This applies to the virtual file tests below, but all other unit
     628        15999478 :     // tests too, so the virtual file facility is always usable in
     629        15999478 :     // unit tests.
     630        15999478 :     //
     631        15999478 :     if cfg!(test) {
     632          181770 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
     633                 :     } else {
     634        15817708 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
     635                 :     }
     636        15999478 : }
     637                 : 
     638                 : #[cfg(test)]
     639                 : mod tests {
     640                 :     use super::*;
     641                 :     use rand::seq::SliceRandom;
     642                 :     use rand::thread_rng;
     643                 :     use rand::Rng;
     644                 :     use std::future::Future;
     645                 :     use std::io::Write;
     646                 :     use std::sync::Arc;
     647                 : 
     648                 :     enum MaybeVirtualFile {
     649                 :         VirtualFile(VirtualFile),
     650                 :         File(File),
     651                 :     }
     652                 : 
     653                 :     impl From<VirtualFile> for MaybeVirtualFile {
     654               3 :         fn from(vf: VirtualFile) -> Self {
     655               3 :             MaybeVirtualFile::VirtualFile(vf)
     656               3 :         }
     657                 :     }
     658                 : 
     659                 :     impl MaybeVirtualFile {
     660             202 :         async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
     661             202 :             match self {
     662             101 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
     663             101 :                 MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
     664                 :             }
     665             202 :         }
     666               4 :         async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
     667               4 :             match self {
     668               2 :                 MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
     669               2 :                 MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
     670                 :             }
     671               4 :         }
     672              18 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     673              18 :             match self {
     674               9 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
     675               9 :                 MaybeVirtualFile::File(file) => file.seek(pos),
     676                 :             }
     677              18 :         }
     678               4 :         async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
     679               4 :             match self {
     680               2 :                 MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
     681               2 :                 MaybeVirtualFile::File(file) => file.write_all(buf),
     682                 :             }
     683               4 :         }
     684                 : 
     685                 :         // Helper function to slurp contents of a file, starting at the current position,
     686                 :         // into a string
     687             221 :         async fn read_string(&mut self) -> Result<String, Error> {
     688             221 :             use std::io::Read;
     689             221 :             let mut buf = String::new();
     690             221 :             match self {
     691             112 :                 MaybeVirtualFile::VirtualFile(file) => {
     692             112 :                     let mut buf = Vec::new();
     693             112 :                     file.read_to_end(&mut buf).await?;
     694             111 :                     return Ok(String::from_utf8(buf).unwrap());
     695                 :                 }
     696             109 :                 MaybeVirtualFile::File(file) => {
     697             109 :                     file.read_to_string(&mut buf)?;
     698                 :                 }
     699                 :             }
     700             108 :             Ok(buf)
     701             221 :         }
     702                 : 
     703                 :         // Helper function to slurp a portion of a file into a string
     704             202 :         async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
     705             202 :             let mut buf = vec![0; len];
     706             202 :             self.read_exact_at(&mut buf, pos).await?;
     707             202 :             Ok(String::from_utf8(buf).unwrap())
     708             202 :         }
     709                 :     }
     710                 : 
     711               1 :     #[tokio::test]
     712               1 :     async fn test_virtual_files() -> Result<(), Error> {
     713               1 :         // The real work is done in the test_files() helper function. This
     714               1 :         // allows us to run the same set of tests against a native File, and
     715               1 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
     716               1 :         // but this allows us to verify that the operations return the same
     717               1 :         // results with VirtualFiles as with native Files. (Except that with
     718               1 :         // native files, you will run out of file descriptors if the ulimit
     719               1 :         // is low enough.)
     720             103 :         test_files("virtual_files", |path, open_options| async move {
     721             103 :             let vf = VirtualFile::open_with_options(&path, &open_options).await?;
     722             103 :             Ok(MaybeVirtualFile::VirtualFile(vf))
     723             103 :         })
     724 UBC           0 :         .await
     725                 :     }
     726                 : 
     727 CBC           1 :     #[tokio::test]
     728               1 :     async fn test_physical_files() -> Result<(), Error> {
     729             103 :         test_files("physical_files", |path, open_options| async move {
     730             103 :             Ok(MaybeVirtualFile::File(open_options.open(path)?))
     731             103 :         })
     732 UBC           0 :         .await
     733                 :     }
     734                 : 
     735 CBC           2 :     async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
     736               2 :     where
     737               2 :         OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
     738               2 :         FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
     739               2 :     {
     740               2 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
     741               2 :         std::fs::create_dir_all(&testdir)?;
     742                 : 
     743               2 :         let path_a = testdir.join("file_a");
     744               2 :         let mut file_a = openfunc(
     745               2 :             path_a.clone(),
     746               2 :             OpenOptions::new()
     747               2 :                 .write(true)
     748               2 :                 .create(true)
     749               2 :                 .truncate(true)
     750               2 :                 .to_owned(),
     751               2 :         )
     752 UBC           0 :         .await?;
     753 CBC           2 :         file_a.write_all(b"foobar").await?;
     754                 : 
     755                 :         // cannot read from a file opened in write-only mode
     756               2 :         let _ = file_a.read_string().await.unwrap_err();
     757                 : 
     758                 :         // Close the file and re-open for reading
     759               2 :         let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
     760                 : 
     761                 :         // cannot write to a file opened in read-only mode
     762               2 :         let _ = file_a.write_all(b"bar").await.unwrap_err();
     763               2 : 
     764               2 :         // Try simple read
     765               2 :         assert_eq!("foobar", file_a.read_string().await?);
     766                 : 
     767                 :         // It's positioned at the EOF now.
     768               2 :         assert_eq!("", file_a.read_string().await?);
     769                 : 
     770                 :         // Test seeks.
     771               2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     772               2 :         assert_eq!("oobar", file_a.read_string().await?);
     773                 : 
     774               2 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
     775               2 :         assert_eq!("ar", file_a.read_string().await?);
     776                 : 
     777               2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     778               2 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
     779               2 :         assert_eq!("bar", file_a.read_string().await?);
     780                 : 
     781               2 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
     782               2 :         assert_eq!("oobar", file_a.read_string().await?);
     783                 : 
     784                 :         // Test erroneous seeks to before byte 0
     785               2 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
     786               2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     787               2 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
     788               2 : 
     789               2 :         // the erroneous seek should have left the position unchanged
     790               2 :         assert_eq!("oobar", file_a.read_string().await?);
     791                 : 
     792                 :         // Create another test file, and try FileExt functions on it.
     793               2 :         let path_b = testdir.join("file_b");
     794               2 :         let mut file_b = openfunc(
     795               2 :             path_b.clone(),
     796               2 :             OpenOptions::new()
     797               2 :                 .read(true)
     798               2 :                 .write(true)
     799               2 :                 .create(true)
     800               2 :                 .truncate(true)
     801               2 :                 .to_owned(),
     802               2 :         )
     803 UBC           0 :         .await?;
     804 CBC           2 :         file_b.write_all_at(b"BAR", 3).await?;
     805               2 :         file_b.write_all_at(b"FOO", 0).await?;
     806                 : 
     807               2 :         assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
     808                 : 
     809                 :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
     810                 :         // open the same file many times. The effect is the same.)
     811                 :         //
     812                 :         // leave file_a positioned at offset 1 before we start
     813               2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     814                 : 
     815               2 :         let mut vfiles = Vec::new();
     816             202 :         for _ in 0..100 {
     817             200 :             let mut vfile =
     818             200 :                 openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
     819             200 :             assert_eq!("FOOBAR", vfile.read_string().await?);
     820             200 :             vfiles.push(vfile);
     821                 :         }
     822                 : 
     823                 :         // make sure we opened enough files to definitely cause evictions.
     824               2 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
     825                 : 
     826                 :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
     827                 :         // from it again. We left the file positioned at offset 1 above.
     828               2 :         assert_eq!("oobar", file_a.read_string().await?);
     829                 : 
     830                 :         // Check that all the other FDs still work too. Use them in random order for
     831                 :         // good measure.
     832               2 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
     833             200 :         for vfile in vfiles.iter_mut() {
     834             200 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
     835                 :         }
     836                 : 
     837               2 :         Ok(())
     838               2 :     }
     839                 : 
     840                 :     /// Test using VirtualFiles from many threads concurrently. This tests both using
     841                 :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
     842                 :     /// VirtualFile from multiple threads concurrently.
     843               1 :     #[tokio::test]
     844               1 :     async fn test_vfile_concurrency() -> Result<(), Error> {
     845               1 :         const SIZE: usize = 8 * 1024;
     846               1 :         const VIRTUAL_FILES: usize = 100;
     847               1 :         const THREADS: usize = 100;
     848               1 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
     849               1 : 
     850               1 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
     851               1 :         std::fs::create_dir_all(&testdir)?;
     852                 : 
     853                 :         // Create a test file.
     854               1 :         let test_file_path = testdir.join("concurrency_test_file");
     855                 :         {
     856               1 :             let file = File::create(&test_file_path)?;
     857               1 :             file.write_all_at(&SAMPLE, 0)?;
     858                 :         }
     859                 : 
     860                 :         // Open the file many times.
     861               1 :         let mut files = Vec::new();
     862             101 :         for _ in 0..VIRTUAL_FILES {
     863             100 :             let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
     864 UBC           0 :                 .await?;
     865 CBC         100 :             files.push(f);
     866                 :         }
     867               1 :         let files = Arc::new(files);
     868               1 : 
     869               1 :         // Launch many threads, and use the virtual files concurrently in random order.
     870               1 :         let rt = tokio::runtime::Builder::new_multi_thread()
     871               1 :             .worker_threads(THREADS)
     872               1 :             .thread_name("test_vfile_concurrency thread")
     873               1 :             .build()
     874               1 :             .unwrap();
     875               1 :         let mut hdls = Vec::new();
     876             101 :         for _threadno in 0..THREADS {
     877             100 :             let files = files.clone();
     878             100 :             let hdl = rt.spawn(async move {
     879             100 :                 let mut buf = [0u8; SIZE];
     880             100 :                 let mut rng = rand::rngs::OsRng;
     881          100000 :                 for _ in 1..1000 {
     882           99900 :                     let f = &files[rng.gen_range(0..files.len())];
     883           99900 :                     f.read_exact_at(&mut buf, 0).await.unwrap();
     884           99900 :                     assert!(buf == SAMPLE);
     885                 :                 }
     886             100 :             });
     887             100 :             hdls.push(hdl);
     888             100 :         }
     889             101 :         for hdl in hdls {
     890             100 :             hdl.await?;
     891                 :         }
     892               1 :         std::mem::forget(rt);
     893               1 : 
     894               1 :         Ok(())
     895                 :     }
     896                 : 
     897               1 :     #[tokio::test]
     898               1 :     async fn test_atomic_overwrite_basic() {
     899               1 :         let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
     900               1 :         std::fs::create_dir_all(&testdir).unwrap();
     901               1 : 
     902               1 :         let path = testdir.join("myfile");
     903               1 :         let tmp_path = testdir.join("myfile.tmp");
     904               1 : 
     905               1 :         VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
     906 UBC           0 :             .await
     907 CBC           1 :             .unwrap();
     908               1 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
     909               1 :         let post = file.read_string().await.unwrap();
     910               1 :         assert_eq!(post, "foo");
     911               1 :         assert!(!tmp_path.exists());
     912               1 :         drop(file);
     913               1 : 
     914               1 :         VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar")
     915 UBC           0 :             .await
     916 CBC           1 :             .unwrap();
     917               1 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
     918               1 :         let post = file.read_string().await.unwrap();
     919               1 :         assert_eq!(post, "bar");
     920               1 :         assert!(!tmp_path.exists());
     921               1 :         drop(file);
     922                 :     }
     923                 : 
     924               1 :     #[tokio::test]
     925               1 :     async fn test_atomic_overwrite_preexisting_tmp() {
     926               1 :         let testdir =
     927               1 :             crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
     928               1 :         std::fs::create_dir_all(&testdir).unwrap();
     929               1 : 
     930               1 :         let path = testdir.join("myfile");
     931               1 :         let tmp_path = testdir.join("myfile.tmp");
     932               1 : 
     933               1 :         std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
     934               1 :         assert!(tmp_path.exists());
     935                 : 
     936               1 :         VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
     937 UBC           0 :             .await
     938 CBC           1 :             .unwrap();
     939                 : 
     940               1 :         let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
     941               1 :         let post = file.read_string().await.unwrap();
     942               1 :         assert_eq!(post, "foo");
     943               1 :         assert!(!tmp_path.exists());
     944               1 :         drop(file);
     945                 :     }
     946                 : }
        

Generated by: LCOV version 2.1-beta