LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file - io_engine.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 55.9 % 236 132
Test Date: 2025-07-16 12:29:03 Functions: 53.9 % 76 41

            Line data    Source code
       1              : //! [`super::VirtualFile`] supports different IO engines.
       2              : //!
       3              : //! The [`IoEngineKind`] enum identifies them.
       4              : //!
       5              : //! The choice of IO engine is global.
       6              : //! Initialize using [`init`].
       7              : //!
       8              : //! Then use [`get`] and  [`super::OpenOptions`].
       9              : //!
      10              : //!
      11              : 
      12              : #[cfg(target_os = "linux")]
      13              : pub(super) mod tokio_epoll_uring_ext;
      14              : 
      15              : use tokio_epoll_uring::IoBuf;
      16              : use tracing::Instrument;
      17              : 
      18              : pub(crate) use super::api::IoEngineKind;
      19              : #[derive(Clone, Copy)]
      20              : #[repr(u8)]
      21              : pub(crate) enum IoEngine {
      22              :     NotSet,
      23              :     StdFs,
      24              :     #[cfg(target_os = "linux")]
      25              :     TokioEpollUring,
      26              : }
      27              : 
      28              : impl From<IoEngineKind> for IoEngine {
      29          122 :     fn from(value: IoEngineKind) -> Self {
      30          122 :         match value {
      31            0 :             IoEngineKind::StdFs => IoEngine::StdFs,
      32              :             #[cfg(target_os = "linux")]
      33          122 :             IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
      34              :         }
      35          122 :     }
      36              : }
      37              : 
      38              : impl TryFrom<u8> for IoEngine {
      39              :     type Error = u8;
      40              : 
      41       304564 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
      42       304564 :         Ok(match value {
      43       304564 :             v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
      44       304442 :             v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
      45              :             #[cfg(target_os = "linux")]
      46       304442 :             v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
      47            0 :             x => return Err(x),
      48              :         })
      49       304564 :     }
      50              : }
      51              : 
      52              : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
      53              : 
      54          122 : pub(crate) fn set(engine_kind: IoEngineKind) {
      55          122 :     let engine: IoEngine = engine_kind.into();
      56          122 :     IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
      57              :     #[cfg(not(test))]
      58            0 :     {
      59            0 :         let metric = &crate::metrics::virtual_file_io_engine::KIND;
      60            0 :         metric.reset();
      61            0 :         metric
      62            0 :             .with_label_values(&[&format!("{engine_kind}")])
      63            0 :             .set(1);
      64            0 :     }
      65          122 : }
      66              : 
      67              : #[cfg(not(test))]
      68            0 : pub(super) fn init(engine_kind: IoEngineKind) {
      69            0 :     set(engine_kind);
      70            0 : }
      71              : 
      72              : /// Longer-term, this API should only be used by [`super::VirtualFile`].
      73       304564 : pub(crate) fn get() -> IoEngine {
      74       304564 :     let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
      75       304564 :     if cfg!(test) {
      76       304564 :         let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
      77       304564 :         match cur {
      78              :             IoEngine::NotSet => {
      79          122 :                 let kind = match std::env::var(env_var_name) {
      80          122 :                     Ok(v) => match v.parse::<IoEngineKind>() {
      81          122 :                         Ok(engine_kind) => engine_kind,
      82            0 :                         Err(e) => {
      83            0 :                             panic!(
      84            0 :                                 "invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}"
      85              :                             )
      86              :                         }
      87              :                     },
      88              :                     Err(std::env::VarError::NotPresent) => {
      89              :                         #[cfg(target_os = "linux")]
      90              :                         {
      91            0 :                             IoEngineKind::TokioEpollUring
      92              :                         }
      93              :                         #[cfg(not(target_os = "linux"))]
      94              :                         {
      95              :                             IoEngineKind::StdFs
      96              :                         }
      97              :                     }
      98              :                     Err(std::env::VarError::NotUnicode(_)) => {
      99            0 :                         panic!("env var {env_var_name} is not unicode");
     100              :                     }
     101              :                 };
     102          122 :                 self::set(kind);
     103          122 :                 self::get()
     104              :             }
     105       304442 :             x => x,
     106              :         }
     107              :     } else {
     108            0 :         cur
     109              :     }
     110       304564 : }
     111              : 
     112              : use std::os::unix::prelude::FileExt;
     113              : use std::sync::atomic::{AtomicU8, Ordering};
     114              : #[cfg(target_os = "linux")]
     115              : use {std::time::Duration, tracing::info};
     116              : 
     117              : use super::owned_buffers_io::io_buf_ext::FullSlice;
     118              : use super::owned_buffers_io::slice::SliceMutExt;
     119              : use super::{FileGuard, Metadata};
     120              : 
     121              : #[cfg(target_os = "linux")]
     122            0 : pub(super) fn epoll_uring_error_to_std(
     123            0 :     e: tokio_epoll_uring::Error<std::io::Error>,
     124            0 : ) -> std::io::Error {
     125            0 :     match e {
     126            0 :         tokio_epoll_uring::Error::Op(e) => e,
     127            0 :         tokio_epoll_uring::Error::System(system) => std::io::Error::other(system),
     128              :     }
     129            0 : }
     130              : 
     131              : impl IoEngine {
     132       278582 :     pub(super) async fn read_at<Buf>(
     133       278582 :         &self,
     134       278582 :         file_guard: FileGuard,
     135       278582 :         offset: u64,
     136       278582 :         mut slice: tokio_epoll_uring::Slice<Buf>,
     137       278582 :     ) -> (
     138       278582 :         (FileGuard, tokio_epoll_uring::Slice<Buf>),
     139       278582 :         std::io::Result<usize>,
     140       278582 :     )
     141       278582 :     where
     142       278582 :         Buf: tokio_epoll_uring::IoBufMut + Send,
     143       278582 :     {
     144       278582 :         match self {
     145            0 :             IoEngine::NotSet => panic!("not initialized"),
     146              :             IoEngine::StdFs => {
     147            0 :                 let rust_slice = slice.as_mut_rust_slice_full_zeroed();
     148            0 :                 let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
     149            0 :                 ((file_guard, slice), res)
     150              :             }
     151              :             #[cfg(target_os = "linux")]
     152              :             IoEngine::TokioEpollUring => {
     153       278582 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     154       278582 :                 let (resources, res) =
     155       278582 :                     retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async {
     156       278582 :                         system.read(file_guard, offset, slice).await
     157       557164 :                     })
     158       278582 :                     .await;
     159       278582 :                 (resources, res.map_err(epoll_uring_error_to_std))
     160              :             }
     161              :         }
     162       278582 :     }
     163         1452 :     pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
     164         1452 :         match self {
     165            0 :             IoEngine::NotSet => panic!("not initialized"),
     166              :             IoEngine::StdFs => {
     167            0 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_all());
     168            0 :                 (file_guard, res)
     169              :             }
     170              :             #[cfg(target_os = "linux")]
     171              :             IoEngine::TokioEpollUring => {
     172         1452 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     173         1452 :                 let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
     174         1452 :                     system.fsync(file_guard).await
     175         2904 :                 })
     176         1452 :                 .await;
     177         1452 :                 (resources, res.map_err(epoll_uring_error_to_std))
     178              :             }
     179              :         }
     180         1452 :     }
     181            0 :     pub(super) async fn sync_data(
     182            0 :         &self,
     183            0 :         file_guard: FileGuard,
     184            0 :     ) -> (FileGuard, std::io::Result<()>) {
     185            0 :         match self {
     186            0 :             IoEngine::NotSet => panic!("not initialized"),
     187              :             IoEngine::StdFs => {
     188            0 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_data());
     189            0 :                 (file_guard, res)
     190              :             }
     191              :             #[cfg(target_os = "linux")]
     192              :             IoEngine::TokioEpollUring => {
     193            0 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     194            0 :                 let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
     195            0 :                     system.fdatasync(file_guard).await
     196            0 :                 })
     197            0 :                 .await;
     198            0 :                 (resources, res.map_err(epoll_uring_error_to_std))
     199              :             }
     200              :         }
     201            0 :     }
     202          930 :     pub(super) async fn metadata(
     203          930 :         &self,
     204          930 :         file_guard: FileGuard,
     205          930 :     ) -> (FileGuard, std::io::Result<Metadata>) {
     206          930 :         match self {
     207            0 :             IoEngine::NotSet => panic!("not initialized"),
     208              :             IoEngine::StdFs => {
     209            0 :                 let res =
     210            0 :                     file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
     211            0 :                 (file_guard, res)
     212              :             }
     213              :             #[cfg(target_os = "linux")]
     214              :             IoEngine::TokioEpollUring => {
     215          930 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     216          930 :                 let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
     217          930 :                     system.statx(file_guard).await
     218         1860 :                 })
     219          930 :                 .await;
     220          930 :                 (
     221          930 :                     resources,
     222          930 :                     res.map_err(epoll_uring_error_to_std).map(Metadata::from),
     223          930 :                 )
     224              :             }
     225              :         }
     226          930 :     }
     227              : 
     228            7 :     pub(super) async fn set_len(
     229            7 :         &self,
     230            7 :         file_guard: FileGuard,
     231            7 :         len: u64,
     232            7 :     ) -> (FileGuard, std::io::Result<()>) {
     233            7 :         match self {
     234            0 :             IoEngine::NotSet => panic!("not initialized"),
     235              :             IoEngine::StdFs => {
     236            0 :                 let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
     237            0 :                 (file_guard, res)
     238              :             }
     239              :             #[cfg(target_os = "linux")]
     240              :             IoEngine::TokioEpollUring => {
     241              :                 // TODO: ftruncate op for tokio-epoll-uring
     242              :                 // Don't forget to use retry_ecanceled_once
     243            7 :                 let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
     244            7 :                 (file_guard, res)
     245              :             }
     246              :         }
     247            7 :     }
     248              : 
     249        19808 :     pub(super) async fn write_at<B: IoBuf + Send>(
     250        19808 :         &self,
     251        19808 :         file_guard: FileGuard,
     252        19808 :         offset: u64,
     253        19808 :         buf: FullSlice<B>,
     254        19808 :     ) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
     255        19808 :         match self {
     256            0 :             IoEngine::NotSet => panic!("not initialized"),
     257              :             IoEngine::StdFs => {
     258            0 :                 let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
     259            0 :                 ((file_guard, buf), result)
     260              :             }
     261              :             #[cfg(target_os = "linux")]
     262              :             IoEngine::TokioEpollUring => {
     263        19808 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     264        19808 :                 let ((file_guard, slice), res) = retry_ecanceled_once(
     265        19808 :                     (file_guard, buf.into_raw_slice()),
     266            0 :                     async |(file_guard, buf)| system.write(file_guard, offset, buf).await,
     267              :                 )
     268        19808 :                 .await;
     269        19808 :                 (
     270        19808 :                     (file_guard, FullSlice::must_new(slice)),
     271        19808 :                     res.map_err(epoll_uring_error_to_std),
     272        19808 :                 )
     273              :             }
     274              :         }
     275        19808 :     }
     276              : 
     277              :     /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
     278              :     /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
     279              :     /// whereas before the switch to [`super::io_engine`], that wasn't the case.
     280              :     /// This method helps avoid such a regression.
     281              :     ///
     282              :     /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
     283            7 :     pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
     284            7 :     where
     285            7 :         Fut: 'static + Send + std::future::Future<Output = R>,
     286            7 :         R: 'static + Send,
     287            7 :     {
     288            7 :         match self {
     289            0 :             IoEngine::NotSet => panic!("not initialized"),
     290              :             IoEngine::StdFs => {
     291            0 :                 let span = tracing::info_span!("spawn_blocking_block_on_if_std");
     292            0 :                 tokio::task::spawn_blocking({
     293            0 :                     move || tokio::runtime::Handle::current().block_on(work.instrument(span))
     294              :                 })
     295            0 :                 .await
     296            0 :                 .expect("failed to join blocking code most likely it panicked, panicking as well")
     297              :             }
     298              :             #[cfg(target_os = "linux")]
     299            7 :             IoEngine::TokioEpollUring => work.await,
     300              :         }
     301            7 :     }
     302              : }
     303              : 
     304              : /// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data,
     305              : /// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED.
     306              : /// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals.
     307              : /// Investigation ticket: <https://github.com/neondatabase/neon/issues/11446>
     308              : ///
     309              : /// This function retries the operation once if it fails with ECANCELED.
     310              : /// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations.
     311              : #[cfg(target_os = "linux")]
     312       399794 : pub(super) async fn retry_ecanceled_once<F, Fut, T, V>(
     313       399794 :     resources: T,
     314       399794 :     f: F,
     315       399794 : ) -> (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)
     316       399794 : where
     317       399794 :     F: Fn(T) -> Fut,
     318       399794 :     Fut: std::future::Future<Output = (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)>,
     319       399794 :     T: Send,
     320       399794 :     V: Send,
     321       399794 : {
     322       399794 :     let (resources, res) = f(resources).await;
     323       399794 :     let Err(e) = res else {
     324       399794 :         return (resources, res);
     325              :     };
     326            0 :     let tokio_epoll_uring::Error::Op(err) = e else {
     327            0 :         return (resources, Err(e));
     328              :     };
     329            0 :     if err.raw_os_error() != Some(nix::libc::ECANCELED) {
     330            0 :         return (resources, Err(tokio_epoll_uring::Error::Op(err)));
     331            0 :     }
     332              :     {
     333              :         static RATE_LIMIT: std::sync::Mutex<utils::rate_limit::RateLimit> =
     334              :             std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1)));
     335            0 :         let mut guard = RATE_LIMIT.lock().unwrap();
     336            0 :         guard.call2(|rate_limit_stats| {
     337            0 :             info!(
     338            0 :                 %rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited"
     339              :             );
     340            0 :         });
     341            0 :         drop(guard);
     342              :     }
     343            0 :     tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners
     344            0 :     let (resources, res) = f(resources).await;
     345            0 :     (resources, res)
     346       399794 : }
     347              : 
     348            0 : pub(super) fn panic_operation_must_be_idempotent() {
     349            0 :     panic!(
     350            0 :         "unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)"
     351              :     )
     352              : }
     353              : 
     354              : pub enum FeatureTestResult {
     355              :     PlatformPreferred(IoEngineKind),
     356              :     Worse {
     357              :         engine: IoEngineKind,
     358              :         remark: String,
     359              :     },
     360              : }
     361              : 
     362              : impl FeatureTestResult {
     363              :     #[cfg(target_os = "linux")]
     364              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
     365              :     #[cfg(not(target_os = "linux"))]
     366              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
     367              : }
     368              : 
     369              : impl From<FeatureTestResult> for IoEngineKind {
     370            0 :     fn from(val: FeatureTestResult) -> Self {
     371            0 :         match val {
     372            0 :             FeatureTestResult::PlatformPreferred(e) => e,
     373            0 :             FeatureTestResult::Worse { engine, .. } => engine,
     374              :         }
     375            0 :     }
     376              : }
     377              : 
     378              : /// Somewhat costly under the hood, do only once.
     379              : /// Panics if we can't set up the feature test.
     380          138 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
     381          138 :     std::thread::spawn(|| {
     382              : 
     383              :         #[cfg(not(target_os = "linux"))]
     384              :         {
     385              :             Ok(FeatureTestResult::PlatformPreferred(
     386              :                 FeatureTestResult::PLATFORM_PREFERRED,
     387              :             ))
     388              :         }
     389              :         #[cfg(target_os = "linux")]
     390              :         {
     391          138 :             let rt = tokio::runtime::Builder::new_current_thread()
     392          138 :                 .enable_all()
     393          138 :                 .build()
     394          138 :                 .unwrap();
     395          138 :             Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
     396              :                 Ok(_) => FeatureTestResult::PlatformPreferred({
     397          138 :                     assert!(matches!(
     398          138 :                         IoEngineKind::TokioEpollUring,
     399              :                         FeatureTestResult::PLATFORM_PREFERRED
     400              :                     ));
     401          138 :                     FeatureTestResult::PLATFORM_PREFERRED
     402              :                 }),
     403            0 :                 Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
     404            0 :                     let remark = match e.raw_os_error() {
     405              :                         Some(nix::libc::EPERM) => {
     406              :                             // fall back
     407            0 :                             "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
     408            0 :                                 .to_string()
     409              :                         }
     410              :                     Some(nix::libc::EFAULT) => {
     411              :                             // fail feature test
     412            0 :                             anyhow::bail!(
     413            0 :                                 "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
     414              :                             );
     415              :                         }
     416              :                         Some(_) | None => {
     417              :                             // fall back
     418            0 :                             format!("creating tokio-epoll-uring fails with error: {e:#}")
     419              :                         }
     420              :                 };
     421            0 :                     FeatureTestResult::Worse {
     422            0 :                         engine: IoEngineKind::StdFs,
     423            0 :                         remark,
     424            0 :                     }
     425              :                 }
     426              :             })
     427              :         }
     428          138 :     })
     429          138 :     .join()
     430          138 :     .unwrap()
     431          138 : }
     432              : 
     433              : /// For use in benchmark binaries only.
     434              : ///
     435              : /// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
     436              : /// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
     437              : /// developer time trying to figure out why it's slow.
     438              : ///
     439              : /// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
     440            0 : pub fn io_engine_for_bench() -> IoEngineKind {
     441              :     #[cfg(not(target_os = "linux"))]
     442              :     {
     443              :         panic!("This benchmark does I/O and can only give a representative result on Linux");
     444              :     }
     445              :     #[cfg(target_os = "linux")]
     446              :     {
     447            0 :         match feature_test().unwrap() {
     448            0 :             FeatureTestResult::PlatformPreferred(engine) => engine,
     449              :             FeatureTestResult::Worse {
     450            0 :                 engine: _engine,
     451            0 :                 remark,
     452              :             } => {
     453            0 :                 panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
     454              :             }
     455              :         }
     456              :     }
     457            0 : }
        

Generated by: LCOV version 2.1-beta