LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file - io_engine.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 70.3 % 209 147
Test Date: 2025-04-24 20:31:15 Functions: 76.3 % 38 29

            Line data    Source code
       1              : //! [`super::VirtualFile`] supports different IO engines.
       2              : //!
       3              : //! The [`IoEngineKind`] enum identifies them.
       4              : //!
       5              : //! The choice of IO engine is global.
       6              : //! Initialize using [`init`].
       7              : //!
       8              : //! Then use [`get`] and  [`super::OpenOptions`].
       9              : //!
      10              : //!
      11              : 
      12              : #[cfg(target_os = "linux")]
      13              : pub(super) mod tokio_epoll_uring_ext;
      14              : 
      15              : use tokio_epoll_uring::IoBuf;
      16              : use tracing::Instrument;
      17              : 
      18              : pub(crate) use super::api::IoEngineKind;
      19              : #[derive(Clone, Copy)]
      20              : #[repr(u8)]
      21              : pub(crate) enum IoEngine {
      22              :     NotSet,
      23              :     StdFs,
      24              :     #[cfg(target_os = "linux")]
      25              :     TokioEpollUring,
      26              : }
      27              : 
      28              : impl From<IoEngineKind> for IoEngine {
      29         1464 :     fn from(value: IoEngineKind) -> Self {
      30         1464 :         match value {
      31          732 :             IoEngineKind::StdFs => IoEngine::StdFs,
      32              :             #[cfg(target_os = "linux")]
      33          732 :             IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
      34              :         }
      35         1464 :     }
      36              : }
      37              : 
      38              : impl TryFrom<u8> for IoEngine {
      39              :     type Error = u8;
      40              : 
      41      3599394 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
      42      3599394 :         Ok(match value {
      43      3599394 :             v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
      44      3597930 :             v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
      45              :             #[cfg(target_os = "linux")]
      46      1799090 :             v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
      47            0 :             x => return Err(x),
      48              :         })
      49      3599394 :     }
      50              : }
      51              : 
      52              : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
      53              : 
      54         1464 : pub(crate) fn set(engine_kind: IoEngineKind) {
      55         1464 :     let engine: IoEngine = engine_kind.into();
      56         1464 :     IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
      57         1464 :     #[cfg(not(test))]
      58         1464 :     {
      59         1464 :         let metric = &crate::metrics::virtual_file_io_engine::KIND;
      60         1464 :         metric.reset();
      61         1464 :         metric
      62         1464 :             .with_label_values(&[&format!("{engine_kind}")])
      63         1464 :             .set(1);
      64         1464 :     }
      65         1464 : }
      66              : 
      67              : #[cfg(not(test))]
      68            0 : pub(super) fn init(engine_kind: IoEngineKind) {
      69            0 :     set(engine_kind);
      70            0 : }
      71              : 
      72              : /// Longer-term, this API should only be used by [`super::VirtualFile`].
      73      3599394 : pub(crate) fn get() -> IoEngine {
      74      3599394 :     let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
      75      3599394 :     if cfg!(test) {
      76      3599394 :         let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
      77      3599394 :         match cur {
      78              :             IoEngine::NotSet => {
      79         1464 :                 let kind = match std::env::var(env_var_name) {
      80         1464 :                     Ok(v) => match v.parse::<IoEngineKind>() {
      81         1464 :                         Ok(engine_kind) => engine_kind,
      82            0 :                         Err(e) => {
      83            0 :                             panic!(
      84            0 :                                 "invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}"
      85            0 :                             )
      86              :                         }
      87              :                     },
      88              :                     Err(std::env::VarError::NotPresent) => {
      89              :                         #[cfg(target_os = "linux")]
      90              :                         {
      91            0 :                             IoEngineKind::TokioEpollUring
      92              :                         }
      93              :                         #[cfg(not(target_os = "linux"))]
      94              :                         {
      95              :                             IoEngineKind::StdFs
      96              :                         }
      97              :                     }
      98              :                     Err(std::env::VarError::NotUnicode(_)) => {
      99            0 :                         panic!("env var {env_var_name} is not unicode");
     100              :                     }
     101              :                 };
     102         1464 :                 self::set(kind);
     103         1464 :                 self::get()
     104              :             }
     105      3597930 :             x => x,
     106              :         }
     107              :     } else {
     108            0 :         cur
     109              :     }
     110      3599394 : }
     111              : 
     112              : use std::os::unix::prelude::FileExt;
     113              : use std::sync::atomic::{AtomicU8, Ordering};
     114              : 
     115              : use super::owned_buffers_io::io_buf_ext::FullSlice;
     116              : use super::owned_buffers_io::slice::SliceMutExt;
     117              : use super::{FileGuard, Metadata};
     118              : 
     119              : #[cfg(target_os = "linux")]
     120           12 : fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
     121           12 :     match e {
     122           12 :         tokio_epoll_uring::Error::Op(e) => e,
     123            0 :         tokio_epoll_uring::Error::System(system) => {
     124            0 :             std::io::Error::new(std::io::ErrorKind::Other, system)
     125              :         }
     126              :     }
     127           12 : }
     128              : 
     129              : impl IoEngine {
     130      3288427 :     pub(super) async fn read_at<Buf>(
     131      3288427 :         &self,
     132      3288427 :         file_guard: FileGuard,
     133      3288427 :         offset: u64,
     134      3288427 :         mut slice: tokio_epoll_uring::Slice<Buf>,
     135      3288427 :     ) -> (
     136      3288427 :         (FileGuard, tokio_epoll_uring::Slice<Buf>),
     137      3288427 :         std::io::Result<usize>,
     138      3288427 :     )
     139      3288427 :     where
     140      3288427 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     141      3288427 :     {
     142      3288427 :         match self {
     143            0 :             IoEngine::NotSet => panic!("not initialized"),
     144              :             IoEngine::StdFs => {
     145      1644086 :                 let rust_slice = slice.as_mut_rust_slice_full_zeroed();
     146      1644086 :                 let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
     147      1644086 :                 ((file_guard, slice), res)
     148              :             }
     149              :             #[cfg(target_os = "linux")]
     150              :             IoEngine::TokioEpollUring => {
     151      1644341 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     152      1644341 :                 let (resources, res) = system.read(file_guard, offset, slice).await;
     153      1644341 :                 (resources, res.map_err(epoll_uring_error_to_std))
     154              :             }
     155              :         }
     156      3288427 :     }
     157        17124 :     pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
     158        17124 :         match self {
     159            0 :             IoEngine::NotSet => panic!("not initialized"),
     160              :             IoEngine::StdFs => {
     161         8562 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_all());
     162         8562 :                 (file_guard, res)
     163              :             }
     164              :             #[cfg(target_os = "linux")]
     165              :             IoEngine::TokioEpollUring => {
     166         8562 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     167         8562 :                 let (resources, res) = system.fsync(file_guard).await;
     168         8562 :                 (resources, res.map_err(epoll_uring_error_to_std))
     169              :             }
     170              :         }
     171        17124 :     }
     172            0 :     pub(super) async fn sync_data(
     173            0 :         &self,
     174            0 :         file_guard: FileGuard,
     175            0 :     ) -> (FileGuard, std::io::Result<()>) {
     176            0 :         match self {
     177            0 :             IoEngine::NotSet => panic!("not initialized"),
     178              :             IoEngine::StdFs => {
     179            0 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_data());
     180            0 :                 (file_guard, res)
     181              :             }
     182              :             #[cfg(target_os = "linux")]
     183              :             IoEngine::TokioEpollUring => {
     184            0 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     185            0 :                 let (resources, res) = system.fdatasync(file_guard).await;
     186            0 :                 (resources, res.map_err(epoll_uring_error_to_std))
     187              :             }
     188              :         }
     189            0 :     }
     190        10992 :     pub(super) async fn metadata(
     191        10992 :         &self,
     192        10992 :         file_guard: FileGuard,
     193        10992 :     ) -> (FileGuard, std::io::Result<Metadata>) {
     194        10992 :         match self {
     195            0 :             IoEngine::NotSet => panic!("not initialized"),
     196              :             IoEngine::StdFs => {
     197         5496 :                 let res =
     198         5496 :                     file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
     199         5496 :                 (file_guard, res)
     200              :             }
     201              :             #[cfg(target_os = "linux")]
     202              :             IoEngine::TokioEpollUring => {
     203         5496 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     204         5496 :                 let (resources, res) = system.statx(file_guard).await;
     205         5496 :                 (
     206         5496 :                     resources,
     207         5496 :                     res.map_err(epoll_uring_error_to_std).map(Metadata::from),
     208         5496 :                 )
     209              :             }
     210              :         }
     211        10992 :     }
     212              : 
     213           84 :     pub(super) async fn set_len(
     214           84 :         &self,
     215           84 :         file_guard: FileGuard,
     216           84 :         len: u64,
     217           84 :     ) -> (FileGuard, std::io::Result<()>) {
     218           84 :         match self {
     219            0 :             IoEngine::NotSet => panic!("not initialized"),
     220              :             IoEngine::StdFs => {
     221           42 :                 let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
     222           42 :                 (file_guard, res)
     223              :             }
     224              :             #[cfg(target_os = "linux")]
     225              :             IoEngine::TokioEpollUring => {
     226              :                 // TODO: ftruncate op for tokio-epoll-uring
     227           42 :                 let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
     228           42 :                 (file_guard, res)
     229              :             }
     230              :         }
     231           84 :     }
     232              : 
     233       237203 :     pub(super) async fn write_at<B: IoBuf + Send>(
     234       237203 :         &self,
     235       237203 :         file_guard: FileGuard,
     236       237203 :         offset: u64,
     237       237203 :         buf: FullSlice<B>,
     238       237203 :     ) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
     239       237203 :         match self {
     240            0 :             IoEngine::NotSet => panic!("not initialized"),
     241              :             IoEngine::StdFs => {
     242       118604 :                 let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
     243       118604 :                 ((file_guard, buf), result)
     244              :             }
     245              :             #[cfg(target_os = "linux")]
     246              :             IoEngine::TokioEpollUring => {
     247       118599 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     248       118599 :                 let ((file_guard, slice), res) =
     249       118599 :                     system.write(file_guard, offset, buf.into_raw_slice()).await;
     250       118599 :                 (
     251       118599 :                     (file_guard, FullSlice::must_new(slice)),
     252       118599 :                     res.map_err(epoll_uring_error_to_std),
     253       118599 :                 )
     254              :             }
     255              :         }
     256       237203 :     }
     257              : 
     258              :     /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
     259              :     /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
     260              :     /// whereas before the switch to [`super::io_engine`], that wasn't the case.
     261              :     /// This method helps avoid such a regression.
     262              :     ///
     263              :     /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
     264           84 :     pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
     265           84 :     where
     266           84 :         Fut: 'static + Send + std::future::Future<Output = R>,
     267           84 :         R: 'static + Send,
     268           84 :     {
     269           84 :         match self {
     270            0 :             IoEngine::NotSet => panic!("not initialized"),
     271              :             IoEngine::StdFs => {
     272           42 :                 let span = tracing::info_span!("spawn_blocking_block_on_if_std");
     273           42 :                 tokio::task::spawn_blocking({
     274           42 :                     move || tokio::runtime::Handle::current().block_on(work.instrument(span))
     275           42 :                 })
     276           42 :                 .await
     277           42 :                 .expect("failed to join blocking code most likely it panicked, panicking as well")
     278              :             }
     279              :             #[cfg(target_os = "linux")]
     280           42 :             IoEngine::TokioEpollUring => work.await,
     281              :         }
     282           84 :     }
     283              : }
     284              : 
     285              : pub enum FeatureTestResult {
     286              :     PlatformPreferred(IoEngineKind),
     287              :     Worse {
     288              :         engine: IoEngineKind,
     289              :         remark: String,
     290              :     },
     291              : }
     292              : 
     293              : impl FeatureTestResult {
     294              :     #[cfg(target_os = "linux")]
     295              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
     296              :     #[cfg(not(target_os = "linux"))]
     297              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
     298              : }
     299              : 
     300              : impl From<FeatureTestResult> for IoEngineKind {
     301            0 :     fn from(val: FeatureTestResult) -> Self {
     302            0 :         match val {
     303            0 :             FeatureTestResult::PlatformPreferred(e) => e,
     304            0 :             FeatureTestResult::Worse { engine, .. } => engine,
     305              :         }
     306            0 :     }
     307              : }
     308              : 
     309              : /// Somewhat costly under the hood, do only once.
     310              : /// Panics if we can't set up the feature test.
     311         1500 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
     312         1500 :     std::thread::spawn(|| {
     313         1500 : 
     314         1500 :         #[cfg(not(target_os = "linux"))]
     315         1500 :         {
     316         1500 :             Ok(FeatureTestResult::PlatformPreferred(
     317         1500 :                 FeatureTestResult::PLATFORM_PREFERRED,
     318         1500 :             ))
     319         1500 :         }
     320         1500 :         #[cfg(target_os = "linux")]
     321         1500 :         {
     322         1500 :             let rt = tokio::runtime::Builder::new_current_thread()
     323         1500 :                 .enable_all()
     324         1500 :                 .build()
     325         1500 :                 .unwrap();
     326         1500 :             Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
     327              :                 Ok(_) => FeatureTestResult::PlatformPreferred({
     328         1500 :                     assert!(matches!(
     329         1500 :                         IoEngineKind::TokioEpollUring,
     330              :                         FeatureTestResult::PLATFORM_PREFERRED
     331              :                     ));
     332         1500 :                     FeatureTestResult::PLATFORM_PREFERRED
     333              :                 }),
     334            0 :                 Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
     335            0 :                     let remark = match e.raw_os_error() {
     336              :                         Some(nix::libc::EPERM) => {
     337              :                             // fall back
     338            0 :                             "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
     339            0 :                                 .to_string()
     340              :                         }
     341              :                     Some(nix::libc::EFAULT) => {
     342              :                             // fail feature test
     343            0 :                             anyhow::bail!(
     344            0 :                                 "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
     345            0 :                             );
     346              :                         }
     347              :                         Some(_) | None => {
     348              :                             // fall back
     349            0 :                             format!("creating tokio-epoll-uring fails with error: {e:#}")
     350              :                         }
     351              :                 };
     352            0 :                     FeatureTestResult::Worse {
     353            0 :                         engine: IoEngineKind::StdFs,
     354            0 :                         remark,
     355            0 :                     }
     356              :                 }
     357              :             })
     358              :         }
     359         1500 :     })
     360         1500 :     .join()
     361         1500 :     .unwrap()
     362         1500 : }
     363              : 
     364              : /// For use in benchmark binaries only.
     365              : ///
     366              : /// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
     367              : /// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
     368              : /// developer time trying to figure out why it's slow.
     369              : ///
     370              : /// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
     371            0 : pub fn io_engine_for_bench() -> IoEngineKind {
     372            0 :     #[cfg(not(target_os = "linux"))]
     373            0 :     {
     374            0 :         panic!("This benchmark does I/O and can only give a representative result on Linux");
     375            0 :     }
     376            0 :     #[cfg(target_os = "linux")]
     377            0 :     {
     378            0 :         match feature_test().unwrap() {
     379            0 :             FeatureTestResult::PlatformPreferred(engine) => engine,
     380              :             FeatureTestResult::Worse {
     381            0 :                 engine: _engine,
     382            0 :                 remark,
     383            0 :             } => {
     384            0 :                 panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
     385              :             }
     386              :         }
     387              :     }
     388            0 : }
        

Generated by: LCOV version 2.1-beta