LCOV - code coverage report
Current view: top level - pageserver/src - virtual_file.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 92.2 % 497 458
Test Date: 2023-09-06 10:18:01 Functions: 87.5 % 88 77

            Line data    Source code
       1              : //!
       2              : //! VirtualFile is like a normal File, but it's not bound directly to
       3              : //! a file descriptor. Instead, the file is opened when it's read from,
       4              : //! and if too many files are open globally in the system, least-recently
       5              : //! used ones are closed.
       6              : //!
       7              : //! To track which files have been recently used, we use the clock algorithm
       8              : //! with a 'recently_used' flag on each slot.
       9              : //!
      10              : //! This is similar to PostgreSQL's virtual file descriptor facility in
      11              : //! src/backend/storage/file/fd.c
      12              : //!
      13              : use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
      14              : use crate::tenant::TENANTS_SEGMENT_NAME;
      15              : use once_cell::sync::OnceCell;
      16              : use std::fs::{self, File, OpenOptions};
      17              : use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
      18              : use std::os::unix::fs::FileExt;
      19              : use std::path::{Path, PathBuf};
      20              : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
      21              : use std::sync::{RwLock, RwLockWriteGuard};
      22              : 
      23              : ///
      24              : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
      25              : /// the underlying file is closed if the system is low on file descriptors,
      26              : /// and re-opened when it's accessed again.
      27              : ///
      28              : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
      29              : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
      30              : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
      31              : /// require a mutable reference, because they modify the "current position".
      32              : ///
      33              : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
      34              : /// slot that 'handle points to, if the underlying file is currently open. If it's not
      35              : /// currently open, the 'handle' can still point to the slot where it was last kept. The
      36              : /// 'tag' field is used to detect whether the handle still is valid or not.
      37              : ///
      38            0 : #[derive(Debug)]
      39              : pub struct VirtualFile {
      40              :     /// Lazy handle to the global file descriptor cache. The slot that this points to
      41              :     /// might contain our File, or it may be empty, or it may contain a File that
      42              :     /// belongs to a different VirtualFile.
      43              :     handle: RwLock<SlotHandle>,
      44              : 
      45              :     /// Current file position
      46              :     pos: u64,
      47              : 
      48              :     /// File path and options to use to open it.
      49              :     ///
      50              :     /// Note: this only contains the options needed to re-open it. For example,
      51              :     /// if a new file is created, we only pass the create flag when it's initially
      52              :     /// opened, in the VirtualFile::create() function, and strip the flag before
      53              :     /// storing it here.
      54              :     pub path: PathBuf,
      55              :     open_options: OpenOptions,
      56              : 
      57              :     // These are strings becase we only use them for metrics, and those expect strings.
      58              :     // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
      59              :     // strings.
      60              :     tenant_id: String,
      61              :     timeline_id: String,
      62              : }
      63              : 
      64       146154 : #[derive(Debug, PartialEq, Clone, Copy)]
      65              : struct SlotHandle {
      66              :     /// Index into OPEN_FILES.slots
      67              :     index: usize,
      68              : 
      69              :     /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
      70              :     /// been recycled and no longer contains the FD for this virtual file.
      71              :     tag: u64,
      72              : }
      73              : 
      74              : /// OPEN_FILES is the global array that holds the physical file descriptors that
      75              : /// are currently open. Each slot in the array is protected by a separate lock,
      76              : /// so that different files can be accessed independently. The lock must be held
      77              : /// in write mode to replace the slot with a different file, but a read mode
      78              : /// is enough to operate on the file, whether you're reading or writing to it.
      79              : ///
      80              : /// OPEN_FILES starts in uninitialized state, and it's initialized by
      81              : /// the virtual_file::init() function. It must be called exactly once at page
      82              : /// server startup.
      83              : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
      84              : 
      85              : struct OpenFiles {
      86              :     slots: &'static [Slot],
      87              : 
      88              :     /// clock arm for the clock algorithm
      89              :     next: AtomicUsize,
      90              : }
      91              : 
      92              : struct Slot {
      93              :     inner: RwLock<SlotInner>,
      94              : 
      95              :     /// has this file been used since last clock sweep?
      96              :     recently_used: AtomicBool,
      97              : }
      98              : 
      99              : struct SlotInner {
     100              :     /// Counter that's incremented every time a different file is stored here.
     101              :     /// To avoid the ABA problem.
     102              :     tag: u64,
     103              : 
     104              :     /// the underlying file
     105              :     file: Option<File>,
     106              : }
     107              : 
     108              : impl OpenFiles {
     109              :     /// Find a slot to use, evicting an existing file descriptor if needed.
     110              :     ///
     111              :     /// On return, we hold a lock on the slot, and its 'tag' has been updated
     112              :     /// recently_used has been set. It's all ready for reuse.
     113       194851 :     fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
     114       194851 :         //
     115       194851 :         // Run the clock algorithm to find a slot to replace.
     116       194851 :         //
     117       194851 :         let num_slots = self.slots.len();
     118       194851 :         let mut retries = 0;
     119              :         let mut slot;
     120              :         let mut slot_guard;
     121              :         let index;
     122       476660 :         loop {
     123       476660 :             let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
     124       476660 :             slot = &self.slots[next];
     125       476660 : 
     126       476660 :             // If the recently_used flag on this slot is set, continue the clock
     127       476660 :             // sweep. Otherwise try to use this slot. If we cannot acquire the
     128       476660 :             // lock, also continue the clock sweep.
     129       476660 :             //
     130       476660 :             // We only continue in this manner for a while, though. If we loop
     131       476660 :             // through the array twice without finding a victim, just pick the
     132       476660 :             // next slot and wait until we can reuse it. This way, we avoid
     133       476660 :             // spinning in the extreme case that all the slots are busy with an
     134       476660 :             // I/O operation.
     135       476660 :             if retries < num_slots * 2 {
     136       475696 :                 if !slot.recently_used.swap(false, Ordering::Release) {
     137       249539 :                     if let Ok(guard) = slot.inner.try_write() {
     138       193887 :                         slot_guard = guard;
     139       193887 :                         index = next;
     140       193887 :                         break;
     141        55652 :                     }
     142       226157 :                 }
     143       281809 :                 retries += 1;
     144              :             } else {
     145          964 :                 slot_guard = slot.inner.write().unwrap();
     146          964 :                 index = next;
     147          964 :                 break;
     148              :             }
     149              :         }
     150              : 
     151              :         //
     152              :         // We now have the victim slot locked. If it was in use previously, close the
     153              :         // old file.
     154              :         //
     155       194851 :         if let Some(old_file) = slot_guard.file.take() {
     156       124062 :             // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
     157       124062 :             // distinguish the two.
     158       124062 :             STORAGE_IO_TIME
     159       124062 :                 .with_label_values(&["close-by-replace"])
     160       124062 :                 .observe_closure_duration(|| drop(old_file));
     161       124062 :         }
     162              : 
     163              :         // Prepare the slot for reuse and return it
     164       194851 :         slot_guard.tag += 1;
     165       194851 :         slot.recently_used.store(true, Ordering::Relaxed);
     166       194851 :         (
     167       194851 :             SlotHandle {
     168       194851 :                 index,
     169       194851 :                 tag: slot_guard.tag,
     170       194851 :             },
     171       194851 :             slot_guard,
     172       194851 :         )
     173       194851 :     }
     174              : }
     175              : 
     176            0 : #[derive(Debug, thiserror::Error)]
     177              : pub enum CrashsafeOverwriteError {
     178              :     #[error("final path has no parent dir")]
     179              :     FinalPathHasNoParentDir,
     180              :     #[error("remove tempfile: {0}")]
     181              :     RemovePreviousTempfile(#[source] std::io::Error),
     182              :     #[error("create tempfile: {0}")]
     183              :     CreateTempfile(#[source] std::io::Error),
     184              :     #[error("write tempfile: {0}")]
     185              :     WriteContents(#[source] std::io::Error),
     186              :     #[error("sync tempfile: {0}")]
     187              :     SyncTempfile(#[source] std::io::Error),
     188              :     #[error("rename tempfile to final path: {0}")]
     189              :     RenameTempfileToFinalPath(#[source] std::io::Error),
     190              :     #[error("open final path parent dir: {0}")]
     191              :     OpenFinalPathParentDir(#[source] std::io::Error),
     192              :     #[error("sync final path parent dir: {0}")]
     193              :     SyncFinalPathParentDir(#[source] std::io::Error),
     194              : }
     195              : impl CrashsafeOverwriteError {
     196              :     /// Returns true iff the new contents are durably stored.
     197            0 :     pub fn are_new_contents_durable(&self) -> bool {
     198            0 :         match self {
     199            0 :             Self::FinalPathHasNoParentDir => false,
     200            0 :             Self::RemovePreviousTempfile(_) => false,
     201            0 :             Self::CreateTempfile(_) => false,
     202            0 :             Self::WriteContents(_) => false,
     203            0 :             Self::SyncTempfile(_) => false,
     204            0 :             Self::RenameTempfileToFinalPath(_) => false,
     205            0 :             Self::OpenFinalPathParentDir(_) => false,
     206            0 :             Self::SyncFinalPathParentDir(_) => true,
     207              :         }
     208            0 :     }
     209              : }
     210              : 
     211              : impl VirtualFile {
     212              :     /// Open a file in read-only mode. Like File::open.
     213        36662 :     pub fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
     214        36662 :         Self::open_with_options(path, OpenOptions::new().read(true))
     215        36662 :     }
     216              : 
     217              :     /// Create a new file for writing. If the file exists, it will be truncated.
     218              :     /// Like File::create.
     219        15514 :     pub fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
     220        15514 :         Self::open_with_options(
     221        15514 :             path,
     222        15514 :             OpenOptions::new().write(true).create(true).truncate(true),
     223        15514 :         )
     224        15514 :     }
     225              : 
     226              :     /// Open a file with given options.
     227              :     ///
     228              :     /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
     229              :     /// they will be applied also when the file is subsequently re-opened, not only
     230              :     /// on the first time. Make sure that's sane!
     231        76675 :     pub fn open_with_options(
     232        76675 :         path: &Path,
     233        76675 :         open_options: &OpenOptions,
     234        76675 :     ) -> Result<VirtualFile, std::io::Error> {
     235        76675 :         let path_str = path.to_string_lossy();
     236        76675 :         let parts = path_str.split('/').collect::<Vec<&str>>();
     237        76675 :         let tenant_id;
     238        76675 :         let timeline_id;
     239        76675 :         if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
     240        59573 :             tenant_id = parts[parts.len() - 4].to_string();
     241        59573 :             timeline_id = parts[parts.len() - 2].to_string();
     242        59573 :         } else {
     243        17102 :             tenant_id = "*".to_string();
     244        17102 :             timeline_id = "*".to_string();
     245        17102 :         }
     246        76675 :         let (handle, mut slot_guard) = get_open_files().find_victim_slot();
     247        76675 :         let file = STORAGE_IO_TIME
     248        76675 :             .with_label_values(&["open"])
     249        76675 :             .observe_closure_duration(|| open_options.open(path))?;
     250              : 
     251              :         // Strip all options other than read and write.
     252              :         //
     253              :         // It would perhaps be nicer to check just for the read and write flags
     254              :         // explicitly, but OpenOptions doesn't contain any functions to read flags,
     255              :         // only to set them.
     256        76675 :         let mut reopen_options = open_options.clone();
     257        76675 :         reopen_options.create(false);
     258        76675 :         reopen_options.create_new(false);
     259        76675 :         reopen_options.truncate(false);
     260        76675 : 
     261        76675 :         let vfile = VirtualFile {
     262        76675 :             handle: RwLock::new(handle),
     263        76675 :             pos: 0,
     264        76675 :             path: path.to_path_buf(),
     265        76675 :             open_options: reopen_options,
     266        76675 :             tenant_id,
     267        76675 :             timeline_id,
     268        76675 :         };
     269        76675 : 
     270        76675 :         slot_guard.file.replace(file);
     271        76675 : 
     272        76675 :         Ok(vfile)
     273        76675 :     }
     274              : 
     275              :     /// Writes a file to the specified `final_path` in a crash safe fasion
     276              :     ///
     277              :     /// The file is first written to the specified tmp_path, and in a second
     278              :     /// step, the tmp path is renamed to the final path. As renames are
     279              :     /// atomic, a crash during the write operation will never leave behind a
     280              :     /// partially written file.
     281         8157 :     pub async fn crashsafe_overwrite(
     282         8157 :         final_path: &Path,
     283         8157 :         tmp_path: &Path,
     284         8157 :         content: &[u8],
     285         8157 :     ) -> Result<(), CrashsafeOverwriteError> {
     286         8157 :         let Some(final_path_parent) = final_path.parent() else {
     287            0 :             return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
     288              :         };
     289         8157 :         match std::fs::remove_file(tmp_path) {
     290            0 :             Ok(()) => {}
     291         8157 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     292            0 :             Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
     293              :         }
     294         8157 :         let mut file = Self::open_with_options(
     295         8157 :             tmp_path,
     296         8157 :             OpenOptions::new()
     297         8157 :                 .write(true)
     298         8157 :                 // Use `create_new` so that, if we race with ourselves or something else,
     299         8157 :                 // we bail out instead of causing damage.
     300         8157 :                 .create_new(true),
     301         8157 :         )
     302         8157 :         .map_err(CrashsafeOverwriteError::CreateTempfile)?;
     303         8157 :         file.write_all(content)
     304         8157 :             .map_err(CrashsafeOverwriteError::WriteContents)?;
     305         8157 :         file.sync_all()
     306         8157 :             .map_err(CrashsafeOverwriteError::SyncTempfile)?;
     307         8157 :         drop(file); // before the rename, that's important!
     308         8157 :                     // renames are atomic
     309         8157 :         std::fs::rename(tmp_path, final_path)
     310         8157 :             .map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
     311              :         // Only open final path parent dirfd now, so that this operation only
     312              :         // ever holds one VirtualFile fd at a time.  That's important because
     313              :         // the current `find_victim_slot` impl might pick the same slot for both
     314              :         // VirtualFile., and it eventually does a blocking write lock instead of
     315              :         // try_lock.
     316         8157 :         let final_parent_dirfd =
     317         8157 :             Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
     318         8157 :                 .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
     319         8157 :         final_parent_dirfd
     320         8157 :             .sync_all()
     321         8157 :             .map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
     322         8157 :         Ok(())
     323         8157 :     }
     324              : 
     325              :     /// Call File::sync_all() on the underlying File.
     326              :     pub fn sync_all(&self) -> Result<(), Error> {
     327        57456 :         self.with_file("fsync", |file| file.sync_all())?
     328        57456 :     }
     329              : 
     330        16620 :     pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
     331        16620 :         self.with_file("metadata", |file| file.metadata())?
     332        16620 :     }
     333              : 
     334              :     /// Helper function that looks up the underlying File for this VirtualFile,
     335              :     /// opening it and evicting some other File if necessary. It calls 'func'
     336              :     /// with the physical File.
     337     19418094 :     fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
     338     19418094 :     where
     339     19418094 :         F: FnMut(&File) -> R,
     340     19418094 :     {
     341     19418094 :         let open_files = get_open_files();
     342              : 
     343       118176 :         let mut handle_guard = {
     344              :             // Read the cached slot handle, and see if the slot that it points to still
     345              :             // contains our File.
     346              :             //
     347              :             // We only need to hold the handle lock while we read the current handle. If
     348              :             // another thread closes the file and recycles the slot for a different file,
     349              :             // we will notice that the handle we read is no longer valid and retry.
     350     19418094 :             let mut handle = *self.handle.read().unwrap();
     351     19446072 :             loop {
     352     19446072 :                 // Check if the slot contains our File
     353     19446072 :                 {
     354     19446072 :                     let slot = &open_files.slots[handle.index];
     355     19446072 :                     let slot_guard = slot.inner.read().unwrap();
     356     19446072 :                     if slot_guard.tag == handle.tag {
     357     19299918 :                         if let Some(file) = &slot_guard.file {
     358              :                             // Found a cached file descriptor.
     359     19299918 :                             slot.recently_used.store(true, Ordering::Relaxed);
     360     19299918 :                             return Ok(STORAGE_IO_TIME
     361     19299918 :                                 .with_label_values(&[op])
     362     19299918 :                                 .observe_closure_duration(|| func(file)));
     363            0 :                         }
     364       146154 :                     }
     365              :                 }
     366              : 
     367              :                 // The slot didn't contain our File. We will have to open it ourselves,
     368              :                 // but before that, grab a write lock on handle in the VirtualFile, so
     369              :                 // that no other thread will try to concurrently open the same file.
     370       146154 :                 let handle_guard = self.handle.write().unwrap();
     371       146154 : 
     372       146154 :                 // If another thread changed the handle while we were not holding the lock,
     373       146154 :                 // then the handle might now be valid again. Loop back to retry.
     374       146154 :                 if *handle_guard != handle {
     375        27978 :                     handle = *handle_guard;
     376        27978 :                     continue;
     377       118176 :                 }
     378       118176 :                 break handle_guard;
     379       118176 :             }
     380       118176 :         };
     381       118176 : 
     382       118176 :         // We need to open the file ourselves. The handle in the VirtualFile is
     383       118176 :         // now locked in write-mode. Find a free slot to put it in.
     384       118176 :         let (handle, mut slot_guard) = open_files.find_victim_slot();
     385              : 
     386              :         // Open the physical file
     387       118176 :         let file = STORAGE_IO_TIME
     388       118176 :             .with_label_values(&["open"])
     389       118176 :             .observe_closure_duration(|| self.open_options.open(&self.path))?;
     390              : 
     391              :         // Perform the requested operation on it
     392       118176 :         let result = STORAGE_IO_TIME
     393       118176 :             .with_label_values(&[op])
     394       118176 :             .observe_closure_duration(|| func(&file));
     395       118176 : 
     396       118176 :         // Store the File in the slot and update the handle in the VirtualFile
     397       118176 :         // to point to it.
     398       118176 :         slot_guard.file.replace(file);
     399       118176 : 
     400       118176 :         *handle_guard = handle;
     401       118176 : 
     402       118176 :         Ok(result)
     403     19418094 :     }
     404              : 
     405            0 :     pub fn remove(self) {
     406            0 :         let path = self.path.clone();
     407            0 :         drop(self);
     408            0 :         std::fs::remove_file(path).expect("failed to remove the virtual file");
     409            0 :     }
     410              : 
     411        49876 :     pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     412        49876 :         match pos {
     413        49871 :             SeekFrom::Start(offset) => {
     414        49871 :                 self.pos = offset;
     415        49871 :             }
     416            2 :             SeekFrom::End(offset) => {
     417            2 :                 self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))??
     418              :             }
     419            3 :             SeekFrom::Current(offset) => {
     420            3 :                 let pos = self.pos as i128 + offset as i128;
     421            3 :                 if pos < 0 {
     422            1 :                     return Err(Error::new(
     423            1 :                         ErrorKind::InvalidInput,
     424            1 :                         "offset would be negative",
     425            1 :                     ));
     426            2 :                 }
     427            2 :                 if pos > u64::MAX as i128 {
     428            0 :                     return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
     429            2 :                 }
     430            2 :                 self.pos = pos as u64;
     431              :             }
     432              :         }
     433        49874 :         Ok(self.pos)
     434        49876 :     }
     435              : 
     436              :     #[cfg(test)]
     437          109 :     async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
     438          216 :         loop {
     439          216 :             let mut tmp = [0; 128];
     440          216 :             match self.read_at(&mut tmp, self.pos).await {
     441          108 :                 Ok(0) => return Ok(()),
     442          107 :                 Ok(n) => {
     443          107 :                     self.pos += n as u64;
     444          107 :                     buf.extend_from_slice(&tmp[..n]);
     445          107 :                 }
     446            1 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     447            1 :                 Err(e) => return Err(e),
     448              :             }
     449              :         }
     450          109 :     }
     451              : 
     452              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
     453      7088754 :     pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
     454     14177432 :         while !buf.is_empty() {
     455      7088727 :             match self.read_at(buf, offset).await {
     456              :                 Ok(0) => {
     457           11 :                     return Err(Error::new(
     458           11 :                         std::io::ErrorKind::UnexpectedEof,
     459           11 :                         "failed to fill whole buffer",
     460           11 :                     ))
     461              :                 }
     462      7088716 :                 Ok(n) => {
     463      7088716 :                     buf = &mut buf[n..];
     464      7088716 :                     offset += n as u64;
     465      7088716 :                 }
     466            0 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     467            0 :                 Err(e) => return Err(e),
     468              :             }
     469              :         }
     470      7088705 :         Ok(())
     471      7088716 :     }
     472              : 
     473              :     // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
     474      3950826 :     pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
     475      7901649 :         while !buf.is_empty() {
     476      3950825 :             match self.write_at(buf, offset) {
     477              :                 Ok(0) => {
     478            0 :                     return Err(Error::new(
     479            0 :                         std::io::ErrorKind::WriteZero,
     480            0 :                         "failed to write whole buffer",
     481            0 :                     ));
     482              :                 }
     483      3950824 :                 Ok(n) => {
     484      3950824 :                     buf = &buf[n..];
     485      3950824 :                     offset += n as u64;
     486      3950824 :                 }
     487            0 :                 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
     488            0 :                 Err(e) => return Err(e),
     489              :             }
     490              :         }
     491      3950824 :         Ok(())
     492      3950824 :     }
     493              : 
     494      7088981 :     pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
     495      7088943 :         let result = self.with_file("read", |file| file.read_at(buf, offset))?;
     496      7088943 :         if let Ok(size) = result {
     497      7088942 :             STORAGE_IO_SIZE
     498      7088942 :                 .with_label_values(&["read", &self.tenant_id, &self.timeline_id])
     499      7088942 :                 .add(size as i64);
     500      7088942 :         }
     501      7088943 :         result
     502      7088943 :     }
     503              : 
     504     12255104 :     pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
     505     12255104 :         let result = self.with_file("write", |file| file.write_at(buf, offset))?;
     506     12255104 :         if let Ok(size) = result {
     507     12255101 :             STORAGE_IO_SIZE
     508     12255101 :                 .with_label_values(&["write", &self.tenant_id, &self.timeline_id])
     509     12255101 :                 .add(size as i64);
     510     12255101 :         }
     511     12255102 :         result
     512     12255102 :     }
     513              : }
     514              : 
     515              : impl Drop for VirtualFile {
     516              :     /// If a VirtualFile is dropped, close the underlying file if it was open.
     517        69968 :     fn drop(&mut self) {
     518        69968 :         let handle = self.handle.get_mut().unwrap();
     519        69968 : 
     520        69968 :         // We could check with a read-lock first, to avoid waiting on an
     521        69968 :         // unrelated I/O.
     522        69968 :         let slot = &get_open_files().slots[handle.index];
     523        69968 :         let mut slot_guard = slot.inner.write().unwrap();
     524        69968 :         if slot_guard.tag == handle.tag {
     525        67330 :             slot.recently_used.store(false, Ordering::Relaxed);
     526        67330 :             // there is also operation "close-by-replace" for closes done on eviction for
     527        67330 :             // comparison.
     528        67330 :             STORAGE_IO_TIME
     529        67330 :                 .with_label_values(&["close"])
     530        67330 :                 .observe_closure_duration(|| drop(slot_guard.file.take()));
     531        67330 :         }
     532        69968 :     }
     533              : }
     534              : 
     535              : impl Write for VirtualFile {
     536      8304278 :     fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
     537      8304278 :         let pos = self.pos;
     538      8304278 :         let n = self.write_at(buf, pos)?;
     539      8304277 :         self.pos += n as u64;
     540      8304277 :         Ok(n)
     541      8304278 :     }
     542              : 
     543            0 :     fn flush(&mut self) -> Result<(), std::io::Error> {
     544            0 :         // flush is no-op for File (at least on unix), so we don't need to do
     545            0 :         // anything here either.
     546            0 :         Ok(())
     547            0 :     }
     548              : }
     549              : 
     550              : impl OpenFiles {
     551          576 :     fn new(num_slots: usize) -> OpenFiles {
     552          576 :         let mut slots = Box::new(Vec::with_capacity(num_slots));
     553        57510 :         for _ in 0..num_slots {
     554        57510 :             let slot = Slot {
     555        57510 :                 recently_used: AtomicBool::new(false),
     556        57510 :                 inner: RwLock::new(SlotInner { tag: 0, file: None }),
     557        57510 :             };
     558        57510 :             slots.push(slot);
     559        57510 :         }
     560              : 
     561          576 :         OpenFiles {
     562          576 :             next: AtomicUsize::new(0),
     563          576 :             slots: Box::leak(slots),
     564          576 :         }
     565          576 :     }
     566              : }
     567              : 
     568              : ///
     569              : /// Initialize the virtual file module. This must be called once at page
     570              : /// server startup.
     571              : ///
     572          575 : pub fn init(num_slots: usize) {
     573          575 :     if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
     574            0 :         panic!("virtual_file::init called twice");
     575          575 :     }
     576          575 : }
     577              : 
     578              : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
     579              : 
     580              : // Get a handle to the global slots array.
     581     19564806 : fn get_open_files() -> &'static OpenFiles {
     582     19564806 :     //
     583     19564806 :     // In unit tests, page server startup doesn't happen and no one calls
     584     19564806 :     // virtual_file::init(). Initialize it here, with a small array.
     585     19564806 :     //
     586     19564806 :     // This applies to the virtual file tests below, but all other unit
     587     19564806 :     // tests too, so the virtual file facility is always usable in
     588     19564806 :     // unit tests.
     589     19564806 :     //
     590     19564806 :     if cfg!(test) {
     591       136446 :         OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
     592              :     } else {
     593     19428360 :         OPEN_FILES.get().expect("virtual_file::init not called yet")
     594              :     }
     595     19564806 : }
     596              : 
     597              : #[cfg(test)]
     598              : mod tests {
     599              :     use super::*;
     600              :     use rand::seq::SliceRandom;
     601              :     use rand::thread_rng;
     602              :     use rand::Rng;
     603              :     use std::sync::Arc;
     604              : 
     605              :     enum MaybeVirtualFile {
     606              :         VirtualFile(VirtualFile),
     607              :         File(File),
     608              :     }
     609              : 
     610              :     impl MaybeVirtualFile {
     611          202 :         async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
     612          202 :             match self {
     613          101 :                 MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
     614          101 :                 MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
     615              :             }
     616          202 :         }
     617            4 :         async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
     618            4 :             match self {
     619            2 :                 MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
     620            2 :                 MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
     621              :             }
     622            4 :         }
     623           18 :         async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
     624           18 :             match self {
     625            9 :                 MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
     626            9 :                 MaybeVirtualFile::File(file) => file.seek(pos),
     627              :             }
     628           18 :         }
     629            4 :         async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
     630            4 :             match self {
     631            2 :                 MaybeVirtualFile::VirtualFile(file) => file.write_all(buf),
     632            2 :                 MaybeVirtualFile::File(file) => file.write_all(buf),
     633              :             }
     634            4 :         }
     635              : 
     636              :         // Helper function to slurp contents of a file, starting at the current position,
     637              :         // into a string
     638          218 :         async fn read_string(&mut self) -> Result<String, Error> {
     639          218 :             use std::io::Read;
     640          218 :             let mut buf = String::new();
     641          218 :             match self {
     642          109 :                 MaybeVirtualFile::VirtualFile(file) => {
     643          109 :                     let mut buf = Vec::new();
     644          109 :                     file.read_to_end(&mut buf).await?;
     645          108 :                     return Ok(String::from_utf8(buf).unwrap());
     646              :                 }
     647          109 :                 MaybeVirtualFile::File(file) => {
     648          109 :                     file.read_to_string(&mut buf)?;
     649              :                 }
     650              :             }
     651          108 :             Ok(buf)
     652          218 :         }
     653              : 
     654              :         // Helper function to slurp a portion of a file into a string
     655          202 :         async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
     656          202 :             let mut buf = vec![0; len];
     657          202 :             self.read_exact_at(&mut buf, pos).await?;
     658          202 :             Ok(String::from_utf8(buf).unwrap())
     659          202 :         }
     660              :     }
     661              : 
     662            1 :     #[tokio::test]
     663            1 :     async fn test_virtual_files() -> Result<(), Error> {
     664            1 :         // The real work is done in the test_files() helper function. This
     665            1 :         // allows us to run the same set of tests against a native File, and
     666            1 :         // VirtualFile. We trust the native Files and wouldn't need to test them,
     667            1 :         // but this allows us to verify that the operations return the same
     668            1 :         // results with VirtualFiles as with native Files. (Except that with
     669            1 :         // native files, you will run out of file descriptors if the ulimit
     670            1 :         // is low enough.)
     671          103 :         test_files("virtual_files", |path, open_options| {
     672          103 :             let vf = VirtualFile::open_with_options(path, open_options)?;
     673          103 :             Ok(MaybeVirtualFile::VirtualFile(vf))
     674          103 :         })
     675            0 :         .await
     676              :     }
     677              : 
     678            1 :     #[tokio::test]
     679            1 :     async fn test_physical_files() -> Result<(), Error> {
     680          103 :         test_files("physical_files", |path, open_options| {
     681          103 :             Ok(MaybeVirtualFile::File(open_options.open(path)?))
     682          103 :         })
     683            0 :         .await
     684              :     }
     685              : 
     686            2 :     async fn test_files<OF>(testname: &str, openfunc: OF) -> Result<(), Error>
     687            2 :     where
     688            2 :         OF: Fn(&Path, &OpenOptions) -> Result<MaybeVirtualFile, std::io::Error>,
     689            2 :     {
     690            2 :         let testdir = crate::config::PageServerConf::test_repo_dir(testname);
     691            2 :         std::fs::create_dir_all(&testdir)?;
     692              : 
     693            2 :         let path_a = testdir.join("file_a");
     694            2 :         let mut file_a = openfunc(
     695            2 :             &path_a,
     696            2 :             OpenOptions::new().write(true).create(true).truncate(true),
     697            2 :         )?;
     698            2 :         file_a.write_all(b"foobar").await?;
     699              : 
     700              :         // cannot read from a file opened in write-only mode
     701            2 :         let _ = file_a.read_string().await.unwrap_err();
     702              : 
     703              :         // Close the file and re-open for reading
     704            2 :         let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
     705              : 
     706              :         // cannot write to a file opened in read-only mode
     707            2 :         let _ = file_a.write_all(b"bar").await.unwrap_err();
     708            2 : 
     709            2 :         // Try simple read
     710            2 :         assert_eq!("foobar", file_a.read_string().await?);
     711              : 
     712              :         // It's positioned at the EOF now.
     713            2 :         assert_eq!("", file_a.read_string().await?);
     714              : 
     715              :         // Test seeks.
     716            2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     717            2 :         assert_eq!("oobar", file_a.read_string().await?);
     718              : 
     719            2 :         assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
     720            2 :         assert_eq!("ar", file_a.read_string().await?);
     721              : 
     722            2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     723            2 :         assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
     724            2 :         assert_eq!("bar", file_a.read_string().await?);
     725              : 
     726            2 :         assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
     727            2 :         assert_eq!("oobar", file_a.read_string().await?);
     728              : 
     729              :         // Test erroneous seeks to before byte 0
     730            2 :         file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
     731            2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     732            2 :         file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
     733            2 : 
     734            2 :         // the erroneous seek should have left the position unchanged
     735            2 :         assert_eq!("oobar", file_a.read_string().await?);
     736              : 
     737              :         // Create another test file, and try FileExt functions on it.
     738            2 :         let path_b = testdir.join("file_b");
     739            2 :         let mut file_b = openfunc(
     740            2 :             &path_b,
     741            2 :             OpenOptions::new()
     742            2 :                 .read(true)
     743            2 :                 .write(true)
     744            2 :                 .create(true)
     745            2 :                 .truncate(true),
     746            2 :         )?;
     747            2 :         file_b.write_all_at(b"BAR", 3).await?;
     748            2 :         file_b.write_all_at(b"FOO", 0).await?;
     749              : 
     750            2 :         assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
     751              : 
     752              :         // Open a lot of files, enough to cause some evictions. (Or to be precise,
     753              :         // open the same file many times. The effect is the same.)
     754              :         //
     755              :         // leave file_a positioned at offset 1 before we start
     756            2 :         assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
     757              : 
     758            2 :         let mut vfiles = Vec::new();
     759          202 :         for _ in 0..100 {
     760          200 :             let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
     761          200 :             assert_eq!("FOOBAR", vfile.read_string().await?);
     762          200 :             vfiles.push(vfile);
     763              :         }
     764              : 
     765              :         // make sure we opened enough files to definitely cause evictions.
     766            2 :         assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
     767              : 
     768              :         // The underlying file descriptor for 'file_a' should be closed now. Try to read
     769              :         // from it again. We left the file positioned at offset 1 above.
     770            2 :         assert_eq!("oobar", file_a.read_string().await?);
     771              : 
     772              :         // Check that all the other FDs still work too. Use them in random order for
     773              :         // good measure.
     774            2 :         vfiles.as_mut_slice().shuffle(&mut thread_rng());
     775          200 :         for vfile in vfiles.iter_mut() {
     776          200 :             assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
     777              :         }
     778              : 
     779            2 :         Ok(())
     780            2 :     }
     781              : 
     782              :     /// Test using VirtualFiles from many threads concurrently. This tests both using
     783              :     /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
     784              :     /// VirtualFile from multiple threads concurrently.
     785            1 :     #[test]
     786            1 :     fn test_vfile_concurrency() -> Result<(), Error> {
     787            1 :         const SIZE: usize = 8 * 1024;
     788            1 :         const VIRTUAL_FILES: usize = 100;
     789            1 :         const THREADS: usize = 100;
     790            1 :         const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
     791            1 : 
     792            1 :         let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
     793            1 :         std::fs::create_dir_all(&testdir)?;
     794              : 
     795              :         // Create a test file.
     796            1 :         let test_file_path = testdir.join("concurrency_test_file");
     797              :         {
     798            1 :             let file = File::create(&test_file_path)?;
     799            1 :             file.write_all_at(&SAMPLE, 0)?;
     800              :         }
     801              : 
     802              :         // Open the file many times.
     803            1 :         let mut files = Vec::new();
     804          101 :         for _ in 0..VIRTUAL_FILES {
     805          100 :             let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
     806          100 :             files.push(f);
     807              :         }
     808            1 :         let files = Arc::new(files);
     809            1 : 
     810            1 :         // Launch many threads, and use the virtual files concurrently in random order.
     811            1 :         let rt = tokio::runtime::Builder::new_multi_thread()
     812            1 :             .worker_threads(THREADS)
     813            1 :             .thread_name("test_vfile_concurrency thread")
     814            1 :             .build()
     815            1 :             .unwrap();
     816          101 :         for _threadno in 0..THREADS {
     817          100 :             let files = files.clone();
     818          100 :             rt.spawn(async move {
     819           77 :                 let mut buf = [0u8; SIZE];
     820           77 :                 let mut rng = rand::rngs::OsRng;
     821        77000 :                 for _ in 1..1000 {
     822        76923 :                     let f = &files[rng.gen_range(0..files.len())];
     823        76923 :                     f.read_exact_at(&mut buf, 0).await.unwrap();
     824        76923 :                     assert!(buf == SAMPLE);
     825              :                 }
     826          100 :             });
     827          100 :         }
     828              : 
     829            1 :         Ok(())
     830            1 :     }
     831              : }
        

Generated by: LCOV version 2.1-beta