LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file - io_engine.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 69.0 % 197 136
Test Date: 2025-03-12 18:28:53 Functions: 86.0 % 43 37

            Line data    Source code
       1              : //! [`super::VirtualFile`] supports different IO engines.
       2              : //!
       3              : //! The [`IoEngineKind`] enum identifies them.
       4              : //!
       5              : //! The choice of IO engine is global.
       6              : //! Initialize using [`init`].
       7              : //!
       8              : //! Then use [`get`] and  [`super::OpenOptions`].
       9              : //!
      10              : //!
      11              : 
      12              : #[cfg(target_os = "linux")]
      13              : pub(super) mod tokio_epoll_uring_ext;
      14              : 
      15              : use tokio_epoll_uring::IoBuf;
      16              : use tracing::Instrument;
      17              : 
      18              : pub(crate) use super::api::IoEngineKind;
      19              : #[derive(Clone, Copy)]
      20              : #[repr(u8)]
      21              : pub(crate) enum IoEngine {
      22              :     NotSet,
      23              :     StdFs,
      24              :     #[cfg(target_os = "linux")]
      25              :     TokioEpollUring,
      26              : }
      27              : 
      28              : impl From<IoEngineKind> for IoEngine {
      29          476 :     fn from(value: IoEngineKind) -> Self {
      30          476 :         match value {
      31          238 :             IoEngineKind::StdFs => IoEngine::StdFs,
      32              :             #[cfg(target_os = "linux")]
      33          238 :             IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
      34              :         }
      35          476 :     }
      36              : }
      37              : 
      38              : impl TryFrom<u8> for IoEngine {
      39              :     type Error = u8;
      40              : 
      41      3266218 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
      42      3266218 :         Ok(match value {
      43      3266218 :             v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
      44      3265742 :             v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
      45              :             #[cfg(target_os = "linux")]
      46      1632870 :             v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
      47            0 :             x => return Err(x),
      48              :         })
      49      3266218 :     }
      50              : }
      51              : 
      52              : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
      53              : 
      54          476 : pub(crate) fn set(engine_kind: IoEngineKind) {
      55          476 :     let engine: IoEngine = engine_kind.into();
      56          476 :     IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
      57          476 :     #[cfg(not(test))]
      58          476 :     {
      59          476 :         let metric = &crate::metrics::virtual_file_io_engine::KIND;
      60          476 :         metric.reset();
      61          476 :         metric
      62          476 :             .with_label_values(&[&format!("{engine_kind}")])
      63          476 :             .set(1);
      64          476 :     }
      65          476 : }
      66              : 
      67              : #[cfg(not(test))]
      68            0 : pub(super) fn init(engine_kind: IoEngineKind) {
      69            0 :     set(engine_kind);
      70            0 : }
      71              : 
      72              : /// Longer-term, this API should only be used by [`super::VirtualFile`].
      73      3266218 : pub(crate) fn get() -> IoEngine {
      74      3266218 :     let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
      75      3266218 :     if cfg!(test) {
      76      3266218 :         let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
      77      3266218 :         match cur {
      78              :             IoEngine::NotSet => {
      79          476 :                 let kind = match std::env::var(env_var_name) {
      80          476 :                     Ok(v) => match v.parse::<IoEngineKind>() {
      81          476 :                         Ok(engine_kind) => engine_kind,
      82            0 :                         Err(e) => {
      83            0 :                             panic!(
      84            0 :                                 "invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}"
      85            0 :                             )
      86              :                         }
      87              :                     },
      88              :                     Err(std::env::VarError::NotPresent) => {
      89              :                         #[cfg(target_os = "linux")]
      90              :                         {
      91            0 :                             IoEngineKind::TokioEpollUring
      92              :                         }
      93              :                         #[cfg(not(target_os = "linux"))]
      94              :                         {
      95              :                             IoEngineKind::StdFs
      96              :                         }
      97              :                     }
      98              :                     Err(std::env::VarError::NotUnicode(_)) => {
      99            0 :                         panic!("env var {env_var_name} is not unicode");
     100              :                     }
     101              :                 };
     102          476 :                 self::set(kind);
     103          476 :                 self::get()
     104              :             }
     105      3265742 :             x => x,
     106              :         }
     107              :     } else {
     108            0 :         cur
     109              :     }
     110      3266218 : }
     111              : 
     112              : use std::os::unix::prelude::FileExt;
     113              : use std::sync::atomic::{AtomicU8, Ordering};
     114              : 
     115              : use super::owned_buffers_io::io_buf_ext::FullSlice;
     116              : use super::owned_buffers_io::slice::SliceMutExt;
     117              : use super::{FileGuard, Metadata};
     118              : 
     119              : #[cfg(target_os = "linux")]
     120            4 : fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
     121            4 :     match e {
     122            4 :         tokio_epoll_uring::Error::Op(e) => e,
     123            0 :         tokio_epoll_uring::Error::System(system) => {
     124            0 :             std::io::Error::new(std::io::ErrorKind::Other, system)
     125              :         }
     126              :     }
     127            4 : }
     128              : 
     129              : impl IoEngine {
     130       967637 :     pub(super) async fn read_at<Buf>(
     131       967637 :         &self,
     132       967637 :         file_guard: FileGuard,
     133       967637 :         offset: u64,
     134       967637 :         mut slice: tokio_epoll_uring::Slice<Buf>,
     135       967637 :     ) -> (
     136       967637 :         (FileGuard, tokio_epoll_uring::Slice<Buf>),
     137       967637 :         std::io::Result<usize>,
     138       967637 :     )
     139       967637 :     where
     140       967637 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     141       967637 :     {
     142       967637 :         match self {
     143            0 :             IoEngine::NotSet => panic!("not initialized"),
     144              :             IoEngine::StdFs => {
     145       483832 :                 let rust_slice = slice.as_mut_rust_slice_full_zeroed();
     146       483832 :                 let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
     147       483832 :                 ((file_guard, slice), res)
     148              :             }
     149              :             #[cfg(target_os = "linux")]
     150              :             IoEngine::TokioEpollUring => {
     151       483805 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     152       483805 :                 let (resources, res) = system.read(file_guard, offset, slice).await;
     153       483805 :                 (resources, res.map_err(epoll_uring_error_to_std))
     154              :             }
     155              :         }
     156       967637 :     }
     157         5650 :     pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
     158         5650 :         match self {
     159            0 :             IoEngine::NotSet => panic!("not initialized"),
     160              :             IoEngine::StdFs => {
     161         2818 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_all());
     162         2818 :                 (file_guard, res)
     163              :             }
     164              :             #[cfg(target_os = "linux")]
     165              :             IoEngine::TokioEpollUring => {
     166         2832 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     167         2832 :                 let (resources, res) = system.fsync(file_guard).await;
     168         2832 :                 (resources, res.map_err(epoll_uring_error_to_std))
     169              :             }
     170              :         }
     171         5650 :     }
     172            0 :     pub(super) async fn sync_data(
     173            0 :         &self,
     174            0 :         file_guard: FileGuard,
     175            0 :     ) -> (FileGuard, std::io::Result<()>) {
     176            0 :         match self {
     177            0 :             IoEngine::NotSet => panic!("not initialized"),
     178              :             IoEngine::StdFs => {
     179            0 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_data());
     180            0 :                 (file_guard, res)
     181              :             }
     182              :             #[cfg(target_os = "linux")]
     183              :             IoEngine::TokioEpollUring => {
     184            0 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     185            0 :                 let (resources, res) = system.fdatasync(file_guard).await;
     186            0 :                 (resources, res.map_err(epoll_uring_error_to_std))
     187              :             }
     188              :         }
     189            0 :     }
     190         3620 :     pub(super) async fn metadata(
     191         3620 :         &self,
     192         3620 :         file_guard: FileGuard,
     193         3620 :     ) -> (FileGuard, std::io::Result<Metadata>) {
     194         3620 :         match self {
     195            0 :             IoEngine::NotSet => panic!("not initialized"),
     196              :             IoEngine::StdFs => {
     197         1810 :                 let res =
     198         1810 :                     file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
     199         1810 :                 (file_guard, res)
     200              :             }
     201              :             #[cfg(target_os = "linux")]
     202              :             IoEngine::TokioEpollUring => {
     203         1810 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     204         1810 :                 let (resources, res) = system.statx(file_guard).await;
     205         1810 :                 (
     206         1810 :                     resources,
     207         1810 :                     res.map_err(epoll_uring_error_to_std).map(Metadata::from),
     208         1810 :                 )
     209              :             }
     210              :         }
     211         3620 :     }
     212      2274177 :     pub(super) async fn write_at<B: IoBuf + Send>(
     213      2274177 :         &self,
     214      2274177 :         file_guard: FileGuard,
     215      2274177 :         offset: u64,
     216      2274177 :         buf: FullSlice<B>,
     217      2274177 :     ) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
     218      2274177 :         match self {
     219            0 :             IoEngine::NotSet => panic!("not initialized"),
     220              :             IoEngine::StdFs => {
     221      1137090 :                 let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
     222      1137090 :                 ((file_guard, buf), result)
     223              :             }
     224              :             #[cfg(target_os = "linux")]
     225              :             IoEngine::TokioEpollUring => {
     226      1137087 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     227      1137087 :                 let ((file_guard, slice), res) =
     228      1137087 :                     system.write(file_guard, offset, buf.into_raw_slice()).await;
     229      1137087 :                 (
     230      1137087 :                     (file_guard, FullSlice::must_new(slice)),
     231      1137087 :                     res.map_err(epoll_uring_error_to_std),
     232      1137087 :                 )
     233              :             }
     234              :         }
     235      2274177 :     }
     236              : 
     237              :     /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
     238              :     /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
     239              :     /// whereas before the switch to [`super::io_engine`], that wasn't the case.
     240              :     /// This method helps avoid such a regression.
     241              :     ///
     242              :     /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
     243           28 :     pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
     244           28 :     where
     245           28 :         Fut: 'static + Send + std::future::Future<Output = R>,
     246           28 :         R: 'static + Send,
     247           28 :     {
     248           28 :         match self {
     249            0 :             IoEngine::NotSet => panic!("not initialized"),
     250              :             IoEngine::StdFs => {
     251           14 :                 let span = tracing::info_span!("spawn_blocking_block_on_if_std");
     252           14 :                 tokio::task::spawn_blocking({
     253           14 :                     move || tokio::runtime::Handle::current().block_on(work.instrument(span))
     254           14 :                 })
     255           14 :                 .await
     256           14 :                 .expect("failed to join blocking code most likely it panicked, panicking as well")
     257              :             }
     258              :             #[cfg(target_os = "linux")]
     259           14 :             IoEngine::TokioEpollUring => work.await,
     260              :         }
     261           28 :     }
     262              : }
     263              : 
     264              : pub enum FeatureTestResult {
     265              :     PlatformPreferred(IoEngineKind),
     266              :     Worse {
     267              :         engine: IoEngineKind,
     268              :         remark: String,
     269              :     },
     270              : }
     271              : 
     272              : impl FeatureTestResult {
     273              :     #[cfg(target_os = "linux")]
     274              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
     275              :     #[cfg(not(target_os = "linux"))]
     276              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
     277              : }
     278              : 
     279              : impl From<FeatureTestResult> for IoEngineKind {
     280            0 :     fn from(val: FeatureTestResult) -> Self {
     281            0 :         match val {
     282            0 :             FeatureTestResult::PlatformPreferred(e) => e,
     283            0 :             FeatureTestResult::Worse { engine, .. } => engine,
     284              :         }
     285            0 :     }
     286              : }
     287              : 
     288              : /// Somewhat costly under the hood, do only once.
     289              : /// Panics if we can't set up the feature test.
     290          488 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
     291          488 :     std::thread::spawn(|| {
     292          488 : 
     293          488 :         #[cfg(not(target_os = "linux"))]
     294          488 :         {
     295          488 :             Ok(FeatureTestResult::PlatformPreferred(
     296          488 :                 FeatureTestResult::PLATFORM_PREFERRED,
     297          488 :             ))
     298          488 :         }
     299          488 :         #[cfg(target_os = "linux")]
     300          488 :         {
     301          488 :             let rt = tokio::runtime::Builder::new_current_thread()
     302          488 :                 .enable_all()
     303          488 :                 .build()
     304          488 :                 .unwrap();
     305          488 :             Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
     306              :                 Ok(_) => FeatureTestResult::PlatformPreferred({
     307          488 :                     assert!(matches!(
     308          488 :                         IoEngineKind::TokioEpollUring,
     309              :                         FeatureTestResult::PLATFORM_PREFERRED
     310              :                     ));
     311          488 :                     FeatureTestResult::PLATFORM_PREFERRED
     312              :                 }),
     313            0 :                 Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
     314            0 :                     let remark = match e.raw_os_error() {
     315              :                         Some(nix::libc::EPERM) => {
     316              :                             // fall back
     317            0 :                             "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
     318            0 :                                 .to_string()
     319              :                         }
     320              :                     Some(nix::libc::EFAULT) => {
     321              :                             // fail feature test
     322            0 :                             anyhow::bail!(
     323            0 :                                 "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
     324            0 :                             );
     325              :                         }
     326              :                         Some(_) | None => {
     327              :                             // fall back
     328            0 :                             format!("creating tokio-epoll-uring fails with error: {e:#}")
     329              :                         }
     330              :                 };
     331            0 :                     FeatureTestResult::Worse {
     332            0 :                         engine: IoEngineKind::StdFs,
     333            0 :                         remark,
     334            0 :                     }
     335              :                 }
     336              :             })
     337              :         }
     338          488 :     })
     339          488 :     .join()
     340          488 :     .unwrap()
     341          488 : }
     342              : 
     343              : /// For use in benchmark binaries only.
     344              : ///
     345              : /// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
     346              : /// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
     347              : /// developer time trying to figure out why it's slow.
     348              : ///
     349              : /// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
     350            0 : pub fn io_engine_for_bench() -> IoEngineKind {
     351            0 :     #[cfg(not(target_os = "linux"))]
     352            0 :     {
     353            0 :         panic!("This benchmark does I/O and can only give a representative result on Linux");
     354            0 :     }
     355            0 :     #[cfg(target_os = "linux")]
     356            0 :     {
     357            0 :         match feature_test().unwrap() {
     358            0 :             FeatureTestResult::PlatformPreferred(engine) => engine,
     359              :             FeatureTestResult::Worse {
     360            0 :                 engine: _engine,
     361            0 :                 remark,
     362            0 :             } => {
     363            0 :                 panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
     364              :             }
     365              :         }
     366              :     }
     367            0 : }
        

Generated by: LCOV version 2.1-beta