LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file - io_engine.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 81.5 % 81 66
Test Date: 2024-02-29 11:57:12 Functions: 77.3 % 22 17

            Line data    Source code
       1              : //! [`super::VirtualFile`] supports different IO engines.
       2              : //!
       3              : //! The [`IoEngineKind`] enum identifies them.
       4              : //!
       5              : //! The choice of IO engine is global.
       6              : //! Initialize using [`init`].
       7              : //!
       8              : //! Then use [`get`] and  [`super::OpenOptions`].
       9              : 
      10              : pub(crate) use super::api::IoEngineKind;
      11            0 : #[derive(Clone, Copy)]
      12              : #[repr(u8)]
      13              : pub(crate) enum IoEngine {
      14              :     NotSet,
      15              :     StdFs,
      16              :     #[cfg(target_os = "linux")]
      17              :     TokioEpollUring,
      18              : }
      19              : 
      20              : impl From<IoEngineKind> for IoEngine {
      21          102 :     fn from(value: IoEngineKind) -> Self {
      22          102 :         match value {
      23           51 :             IoEngineKind::StdFs => IoEngine::StdFs,
      24              :             #[cfg(target_os = "linux")]
      25           51 :             IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
      26              :         }
      27          102 :     }
      28              : }
      29              : 
      30              : impl TryFrom<u8> for IoEngine {
      31              :     type Error = u8;
      32              : 
      33       313156 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
      34       313156 :         Ok(match value {
      35       313156 :             v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
      36       313054 :             v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
      37              :             #[cfg(target_os = "linux")]
      38       156433 :             v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
      39            0 :             x => return Err(x),
      40              :         })
      41       313156 :     }
      42              : }
      43              : 
      44              : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
      45              : 
      46          102 : pub(crate) fn set(engine_kind: IoEngineKind) {
      47          102 :     let engine: IoEngine = engine_kind.into();
      48          102 :     IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
      49          102 :     #[cfg(not(test))]
      50          102 :     {
      51          102 :         let metric = &crate::metrics::virtual_file_io_engine::KIND;
      52          102 :         metric.reset();
      53          102 :         metric
      54          102 :             .with_label_values(&[&format!("{engine_kind}")])
      55          102 :             .set(1);
      56          102 :     }
      57          102 : }
      58              : 
      59              : #[cfg(not(test))]
      60            0 : pub(super) fn init(engine_kind: IoEngineKind) {
      61            0 :     set(engine_kind);
      62            0 : }
      63              : 
      64       313156 : pub(super) fn get() -> IoEngine {
      65       313156 :     let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
      66       313156 :     if cfg!(test) {
      67       313156 :         let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
      68       313156 :         match cur {
      69              :             IoEngine::NotSet => {
      70          102 :                 let kind = match std::env::var(env_var_name) {
      71          102 :                     Ok(v) => match v.parse::<IoEngineKind>() {
      72          102 :                         Ok(engine_kind) => engine_kind,
      73            0 :                         Err(e) => {
      74            0 :                             panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
      75              :                         }
      76              :                     },
      77              :                     Err(std::env::VarError::NotPresent) => {
      78            0 :                         crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
      79            0 :                             .parse()
      80            0 :                             .unwrap()
      81              :                     }
      82              :                     Err(std::env::VarError::NotUnicode(_)) => {
      83            0 :                         panic!("env var {env_var_name} is not unicode");
      84              :                     }
      85              :                 };
      86          102 :                 self::set(kind);
      87          102 :                 self::get()
      88              :             }
      89       313054 :             x => x,
      90              :         }
      91              :     } else {
      92            0 :         cur
      93              :     }
      94       313156 : }
      95              : 
      96              : use std::{
      97              :     os::unix::prelude::FileExt,
      98              :     sync::atomic::{AtomicU8, Ordering},
      99              : };
     100              : 
     101              : use super::FileGuard;
     102              : 
     103              : impl IoEngine {
     104       310738 :     pub(super) async fn read_at<B>(
     105       310738 :         &self,
     106       310738 :         file_guard: FileGuard,
     107       310738 :         offset: u64,
     108       310738 :         mut buf: B,
     109       310738 :     ) -> ((FileGuard, B), std::io::Result<usize>)
     110       310738 :     where
     111       310738 :         B: tokio_epoll_uring::BoundedBufMut + Send,
     112       310738 :     {
     113       310738 :         match self {
     114            0 :             IoEngine::NotSet => panic!("not initialized"),
     115              :             IoEngine::StdFs => {
     116              :                 // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
     117       155463 :                 let dst = unsafe {
     118       155463 :                     std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
     119       155463 :                 };
     120       155463 :                 let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
     121       155463 :                 if let Ok(nbytes) = &res {
     122       155462 :                     assert!(*nbytes <= buf.bytes_total());
     123              :                     // SAFETY: see above assertion
     124       155462 :                     unsafe {
     125       155462 :                         buf.set_init(*nbytes);
     126       155462 :                     }
     127            1 :                 }
     128              :                 #[allow(dropping_references)]
     129       155463 :                 drop(dst);
     130       155463 :                 ((file_guard, buf), res)
     131              :             }
     132              :             #[cfg(target_os = "linux")]
     133              :             IoEngine::TokioEpollUring => {
     134       155275 :                 let system = tokio_epoll_uring::thread_local_system().await;
     135       155292 :                 let (resources, res) = system.read(file_guard, offset, buf).await;
     136       155275 :                 (
     137       155275 :                     resources,
     138       155275 :                     res.map_err(|e| match e {
     139            1 :                         tokio_epoll_uring::Error::Op(e) => e,
     140            0 :                         tokio_epoll_uring::Error::System(system) => {
     141            0 :                             std::io::Error::new(std::io::ErrorKind::Other, system)
     142              :                         }
     143       155275 :                     }),
     144       155275 :                 )
     145              :             }
     146              :         }
     147       310738 :     }
     148              : }
        

Generated by: LCOV version 2.1-beta