LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file - io_engine.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 86.4 % 81 70
Test Date: 2024-02-14 18:05:35 Functions: 83.3 % 18 15

            Line data    Source code
       1              : //! [`super::VirtualFile`] supports different IO engines.
       2              : //!
       3              : //! The [`IoEngineKind`] enum identifies them.
       4              : //!
       5              : //! The choice of IO engine is global.
       6              : //! Initialize using [`init`].
       7              : //!
       8              : //! Then use [`get`] and  [`super::OpenOptions`].
       9              : 
      10              : pub(crate) use super::api::IoEngineKind;
      11            0 : #[derive(Clone, Copy)]
      12              : #[repr(u8)]
      13              : pub(crate) enum IoEngine {
      14              :     NotSet,
      15              :     StdFs,
      16              :     #[cfg(target_os = "linux")]
      17              :     TokioEpollUring,
      18              : }
      19              : 
      20              : impl From<IoEngineKind> for IoEngine {
      21          723 :     fn from(value: IoEngineKind) -> Self {
      22          723 :         match value {
      23          674 :             IoEngineKind::StdFs => IoEngine::StdFs,
      24              :             #[cfg(target_os = "linux")]
      25           49 :             IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
      26              :         }
      27          723 :     }
      28              : }
      29              : 
      30              : impl TryFrom<u8> for IoEngine {
      31              :     type Error = u8;
      32              : 
      33      5208058 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
      34      5208058 :         Ok(match value {
      35      5208058 :             v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
      36      5207960 :             v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
      37              :             #[cfg(target_os = "linux")]
      38       145705 :             v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
      39            0 :             x => return Err(x),
      40              :         })
      41      5208058 :     }
      42              : }
      43              : 
      44              : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
      45              : 
      46          723 : pub(crate) fn set(engine_kind: IoEngineKind) {
      47          723 :     let engine: IoEngine = engine_kind.into();
      48          723 :     IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
      49          723 :     #[cfg(not(test))]
      50          723 :     {
      51          723 :         let metric = &crate::metrics::virtual_file_io_engine::KIND;
      52          723 :         metric.reset();
      53          723 :         metric
      54          723 :             .with_label_values(&[&format!("{engine_kind}")])
      55          723 :             .set(1);
      56          723 :     }
      57          723 : }
      58              : 
      59              : #[cfg(not(test))]
      60          625 : pub(super) fn init(engine_kind: IoEngineKind) {
      61          625 :     set(engine_kind);
      62          625 : }
      63              : 
      64      5208058 : pub(super) fn get() -> IoEngine {
      65      5208058 :     let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
      66      5208058 :     if cfg!(test) {
      67       291665 :         let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
      68       291665 :         match cur {
      69              :             IoEngine::NotSet => {
      70           98 :                 let kind = match std::env::var(env_var_name) {
      71           98 :                     Ok(v) => match v.parse::<IoEngineKind>() {
      72           98 :                         Ok(engine_kind) => engine_kind,
      73            0 :                         Err(e) => {
      74            0 :                             panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
      75              :                         }
      76              :                     },
      77              :                     Err(std::env::VarError::NotPresent) => {
      78            0 :                         crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
      79            0 :                             .parse()
      80            0 :                             .unwrap()
      81              :                     }
      82              :                     Err(std::env::VarError::NotUnicode(_)) => {
      83            0 :                         panic!("env var {env_var_name} is not unicode");
      84              :                     }
      85              :                 };
      86           98 :                 self::set(kind);
      87           98 :                 self::get()
      88              :             }
      89       291567 :             x => x,
      90              :         }
      91              :     } else {
      92      4916393 :         cur
      93              :     }
      94      5208058 : }
      95              : 
      96              : use std::{
      97              :     os::unix::prelude::FileExt,
      98              :     sync::atomic::{AtomicU8, Ordering},
      99              : };
     100              : 
     101              : use super::FileGuard;
     102              : 
     103              : impl IoEngine {
     104      5145996 :     pub(super) async fn read_at<B>(
     105      5145996 :         &self,
     106      5145996 :         file_guard: FileGuard,
     107      5145996 :         offset: u64,
     108      5145996 :         mut buf: B,
     109      5145996 :     ) -> ((FileGuard, B), std::io::Result<usize>)
     110      5145996 :     where
     111      5145996 :         B: tokio_epoll_uring::BoundedBufMut + Send,
     112      5145996 :     {
     113      5145996 :         match self {
     114            0 :             IoEngine::NotSet => panic!("not initialized"),
     115              :             IoEngine::StdFs => {
     116              :                 // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
     117      5001285 :                 let dst = unsafe {
     118      5001285 :                     std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
     119      5001285 :                 };
     120      5001285 :                 let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
     121      5001285 :                 if let Ok(nbytes) = &res {
     122      5001284 :                     assert!(*nbytes <= buf.bytes_total());
     123              :                     // SAFETY: see above assertion
     124      5001284 :                     unsafe {
     125      5001284 :                         buf.set_init(*nbytes);
     126      5001284 :                     }
     127            1 :                 }
     128              :                 #[allow(dropping_references)]
     129      5001285 :                 drop(dst);
     130      5001285 :                 ((file_guard, buf), res)
     131              :             }
     132              :             #[cfg(target_os = "linux")]
     133              :             IoEngine::TokioEpollUring => {
     134       144711 :                 let system = tokio_epoll_uring::thread_local_system().await;
     135       144726 :                 let (resources, res) = system.read(file_guard, offset, buf).await;
     136       144711 :                 (
     137       144711 :                     resources,
     138       144711 :                     res.map_err(|e| match e {
     139            1 :                         tokio_epoll_uring::Error::Op(e) => e,
     140            0 :                         tokio_epoll_uring::Error::System(system) => {
     141            0 :                             std::io::Error::new(std::io::ErrorKind::Other, system)
     142              :                         }
     143       144711 :                     }),
     144       144711 :                 )
     145              :             }
     146              :         }
     147      5145996 :     }
     148              : }
        

Generated by: LCOV version 2.1-beta