|             Line data    Source code 
       1              : //! [`super::VirtualFile`] supports different IO engines.
       2              : //!
       3              : //! The [`IoEngineKind`] enum identifies them.
       4              : //!
       5              : //! The choice of IO engine is global.
       6              : //! Initialize using [`init`].
       7              : //!
       8              : //! Then use [`get`] and  [`super::OpenOptions`].
       9              : //!
      10              : //!
      11              : 
      12              : #[cfg(target_os = "linux")]
      13              : pub(super) mod tokio_epoll_uring_ext;
      14              : 
      15              : use tokio_epoll_uring::{IoBuf, Slice};
      16              : use tracing::Instrument;
      17              : 
      18              : pub(crate) use super::api::IoEngineKind;
      19              : #[derive(Clone, Copy)]
      20              : #[repr(u8)]
      21              : pub(crate) enum IoEngine {
      22              :     NotSet,
      23              :     StdFs,
      24              :     #[cfg(target_os = "linux")]
      25              :     TokioEpollUring,
      26              : }
      27              : 
      28              : impl From<IoEngineKind> for IoEngine {
      29          159 :     fn from(value: IoEngineKind) -> Self {
      30          159 :         match value {
      31           80 :             IoEngineKind::StdFs => IoEngine::StdFs,
      32              :             #[cfg(target_os = "linux")]
      33           79 :             IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
      34              :         }
      35          159 :     }
      36              : }
      37              : 
      38              : impl TryFrom<u8> for IoEngine {
      39              :     type Error = u8;
      40              : 
      41      1496579 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
      42      1496579 :         Ok(match value {
      43      1496579 :             v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
      44      1496420 :             v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
      45              :             #[cfg(target_os = "linux")]
      46       748208 :             v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
      47            0 :             x => return Err(x),
      48              :         })
      49      1496579 :     }
      50              : }
      51              : 
      52              : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
      53              : 
      54          159 : pub(crate) fn set(engine_kind: IoEngineKind) {
      55          159 :     let engine: IoEngine = engine_kind.into();
      56          159 :     IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
      57          159 :     #[cfg(not(test))]
      58          159 :     {
      59          159 :         let metric = &crate::metrics::virtual_file_io_engine::KIND;
      60          159 :         metric.reset();
      61          159 :         metric
      62          159 :             .with_label_values(&[&format!("{engine_kind}")])
      63          159 :             .set(1);
      64          159 :     }
      65          159 : }
      66              : 
      67              : #[cfg(not(test))]
      68            0 : pub(super) fn init(engine_kind: IoEngineKind) {
      69            0 :     set(engine_kind);
      70            0 : }
      71              : 
      72              : /// Longer-term, this API should only be used by [`super::VirtualFile`].
      73      1496579 : pub(crate) fn get() -> IoEngine {
      74      1496579 :     let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
      75      1496579 :     if cfg!(test) {
      76      1496579 :         let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
      77      1496579 :         match cur {
      78              :             IoEngine::NotSet => {
      79          159 :                 let kind = match std::env::var(env_var_name) {
      80          159 :                     Ok(v) => match v.parse::<IoEngineKind>() {
      81          159 :                         Ok(engine_kind) => engine_kind,
      82            0 :                         Err(e) => {
      83            0 :                             panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
      84              :                         }
      85              :                     },
      86              :                     Err(std::env::VarError::NotPresent) => {
      87            0 :                         crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
      88            0 :                             .parse()
      89            0 :                             .unwrap()
      90              :                     }
      91              :                     Err(std::env::VarError::NotUnicode(_)) => {
      92            0 :                         panic!("env var {env_var_name} is not unicode");
      93              :                     }
      94              :                 };
      95          159 :                 self::set(kind);
      96          159 :                 self::get()
      97              :             }
      98      1496420 :             x => x,
      99              :         }
     100              :     } else {
     101            0 :         cur
     102              :     }
     103      1496579 : }
     104              : 
     105              : use std::{
     106              :     os::unix::prelude::FileExt,
     107              :     sync::atomic::{AtomicU8, Ordering},
     108              : };
     109              : 
     110              : use super::{FileGuard, Metadata};
     111              : 
     112              : #[cfg(target_os = "linux")]
     113            2 : fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
     114            2 :     match e {
     115            2 :         tokio_epoll_uring::Error::Op(e) => e,
     116            0 :         tokio_epoll_uring::Error::System(system) => {
     117            0 :             std::io::Error::new(std::io::ErrorKind::Other, system)
     118              :         }
     119              :     }
     120            2 : }
     121              : 
     122              : impl IoEngine {
     123       373171 :     pub(super) async fn read_at<B>(
     124       373171 :         &self,
     125       373171 :         file_guard: FileGuard,
     126       373171 :         offset: u64,
     127       373171 :         mut buf: B,
     128       373171 :     ) -> ((FileGuard, B), std::io::Result<usize>)
     129       373171 :     where
     130       373171 :         B: tokio_epoll_uring::BoundedBufMut + Send,
     131       373171 :     {
     132       373171 :         match self {
     133            0 :             IoEngine::NotSet => panic!("not initialized"),
     134              :             IoEngine::StdFs => {
     135              :                 // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
     136       186580 :                 let dst = unsafe {
     137       186580 :                     std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
     138       186580 :                 };
     139       186580 :                 let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
     140       186580 :                 if let Ok(nbytes) = &res {
     141       186579 :                     assert!(*nbytes <= buf.bytes_total());
     142              :                     // SAFETY: see above assertion
     143       186579 :                     unsafe {
     144       186579 :                         buf.set_init(*nbytes);
     145       186579 :                     }
     146            1 :                 }
     147              :                 #[allow(dropping_references)]
     148       186580 :                 drop(dst);
     149       186580 :                 ((file_guard, buf), res)
     150              :             }
     151              :             #[cfg(target_os = "linux")]
     152              :             IoEngine::TokioEpollUring => {
     153       186591 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     154       186649 :                 let (resources, res) = system.read(file_guard, offset, buf).await;
     155       186591 :                 (resources, res.map_err(epoll_uring_error_to_std))
     156              :             }
     157              :         }
     158       373171 :     }
     159         2705 :     pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
     160         2705 :         match self {
     161            0 :             IoEngine::NotSet => panic!("not initialized"),
     162              :             IoEngine::StdFs => {
     163         1352 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_all());
     164         1352 :                 (file_guard, res)
     165              :             }
     166              :             #[cfg(target_os = "linux")]
     167              :             IoEngine::TokioEpollUring => {
     168         1353 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     169         1354 :                 let (resources, res) = system.fsync(file_guard).await;
     170         1353 :                 (resources, res.map_err(epoll_uring_error_to_std))
     171              :             }
     172              :         }
     173         2705 :     }
     174            0 :     pub(super) async fn sync_data(
     175            0 :         &self,
     176            0 :         file_guard: FileGuard,
     177            0 :     ) -> (FileGuard, std::io::Result<()>) {
     178            0 :         match self {
     179            0 :             IoEngine::NotSet => panic!("not initialized"),
     180              :             IoEngine::StdFs => {
     181            0 :                 let res = file_guard.with_std_file(|std_file| std_file.sync_data());
     182            0 :                 (file_guard, res)
     183              :             }
     184              :             #[cfg(target_os = "linux")]
     185              :             IoEngine::TokioEpollUring => {
     186            0 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     187            0 :                 let (resources, res) = system.fdatasync(file_guard).await;
     188            0 :                 (resources, res.map_err(epoll_uring_error_to_std))
     189              :             }
     190              :         }
     191            0 :     }
     192         1547 :     pub(super) async fn metadata(
     193         1547 :         &self,
     194         1547 :         file_guard: FileGuard,
     195         1547 :     ) -> (FileGuard, std::io::Result<Metadata>) {
     196         1547 :         match self {
     197            0 :             IoEngine::NotSet => panic!("not initialized"),
     198              :             IoEngine::StdFs => {
     199          774 :                 let res =
     200          774 :                     file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
     201          774 :                 (file_guard, res)
     202              :             }
     203              :             #[cfg(target_os = "linux")]
     204              :             IoEngine::TokioEpollUring => {
     205          773 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     206          773 :                 let (resources, res) = system.statx(file_guard).await;
     207          773 :                 (
     208          773 :                     resources,
     209          773 :                     res.map_err(epoll_uring_error_to_std).map(Metadata::from),
     210          773 :                 )
     211              :             }
     212              :         }
     213         1547 :     }
     214      1112189 :     pub(super) async fn write_at<B: IoBuf + Send>(
     215      1112189 :         &self,
     216      1112189 :         file_guard: FileGuard,
     217      1112189 :         offset: u64,
     218      1112189 :         buf: Slice<B>,
     219      1112189 :     ) -> ((FileGuard, Slice<B>), std::io::Result<usize>) {
     220      1112189 :         match self {
     221            0 :             IoEngine::NotSet => panic!("not initialized"),
     222              :             IoEngine::StdFs => {
     223       556101 :                 let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
     224       556101 :                 ((file_guard, buf), result)
     225              :             }
     226              :             #[cfg(target_os = "linux")]
     227              :             IoEngine::TokioEpollUring => {
     228       556088 :                 let system = tokio_epoll_uring_ext::thread_local_system().await;
     229       556104 :                 let (resources, res) = system.write(file_guard, offset, buf).await;
     230       556088 :                 (resources, res.map_err(epoll_uring_error_to_std))
     231              :             }
     232              :         }
     233      1112189 :     }
     234              : 
     235              :     /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
     236              :     /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
     237              :     /// whereas before the switch to [`super::io_engine`], that wasn't the case.
     238              :     /// This method helps avoid such a regression.
     239              :     ///
     240              :     /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
     241            6 :     pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
     242            6 :     where
     243            6 :         Fut: 'static + Send + std::future::Future<Output = R>,
     244            6 :         R: 'static + Send,
     245            6 :     {
     246            6 :         match self {
     247            0 :             IoEngine::NotSet => panic!("not initialized"),
     248              :             IoEngine::StdFs => {
     249            3 :                 let span = tracing::info_span!("spawn_blocking_block_on_if_std");
     250            3 :                 tokio::task::spawn_blocking({
     251            3 :                     move || tokio::runtime::Handle::current().block_on(work.instrument(span))
     252            3 :                 })
     253            3 :                 .await
     254            3 :                 .expect("failed to join blocking code most likely it panicked, panicking as well")
     255              :             }
     256              :             #[cfg(target_os = "linux")]
     257            6 :             IoEngine::TokioEpollUring => work.await,
     258              :         }
     259            6 :     }
     260              : }
     261              : 
     262              : pub enum FeatureTestResult {
     263              :     PlatformPreferred(IoEngineKind),
     264              :     Worse {
     265              :         engine: IoEngineKind,
     266              :         remark: String,
     267              :     },
     268              : }
     269              : 
     270              : impl FeatureTestResult {
     271              :     #[cfg(target_os = "linux")]
     272              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
     273              :     #[cfg(not(target_os = "linux"))]
     274              :     const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
     275              : }
     276              : 
     277              : impl From<FeatureTestResult> for IoEngineKind {
     278            0 :     fn from(val: FeatureTestResult) -> Self {
     279            0 :         match val {
     280            0 :             FeatureTestResult::PlatformPreferred(e) => e,
     281            0 :             FeatureTestResult::Worse { engine, .. } => engine,
     282              :         }
     283            0 :     }
     284              : }
     285              : 
     286              : /// Somewhat costly under the hood, do only once.
     287              : /// Panics if we can't set up the feature test.
     288           18 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
     289           18 :     std::thread::spawn(|| {
     290           18 : 
     291           18 :         #[cfg(not(target_os = "linux"))]
     292           18 :         {
     293           18 :             Ok(FeatureTestResult::PlatformPreferred(
     294           18 :                 FeatureTestResult::PLATFORM_PREFERRED,
     295           18 :             ))
     296           18 :         }
     297           18 :         #[cfg(target_os = "linux")]
     298           18 :         {
     299           18 :             let rt = tokio::runtime::Builder::new_current_thread()
     300           18 :                 .enable_all()
     301           18 :                 .build()
     302           18 :                 .unwrap();
     303           18 :             Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
     304              :                 Ok(_) => FeatureTestResult::PlatformPreferred({
     305           18 :                     assert!(matches!(
     306           18 :                         IoEngineKind::TokioEpollUring,
     307              :                         FeatureTestResult::PLATFORM_PREFERRED
     308              :                     ));
     309           18 :                     FeatureTestResult::PLATFORM_PREFERRED
     310              :                 }),
     311            0 :                 Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
     312            0 :                     let remark = match e.raw_os_error() {
     313              :                         Some(nix::libc::EPERM) => {
     314              :                             // fall back
     315            0 :                             "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
     316            0 :                                 .to_string()
     317              :                         }
     318              :                     Some(nix::libc::EFAULT) => {
     319              :                             // fail feature test
     320            0 :                             anyhow::bail!(
     321            0 :                                 "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
     322            0 :                             );
     323              :                         }
     324              :                         Some(_) | None => {
     325              :                             // fall back
     326            0 :                             format!("creating tokio-epoll-uring fails with error: {e:#}")
     327              :                         }
     328              :                 };
     329            0 :                     FeatureTestResult::Worse {
     330            0 :                         engine: IoEngineKind::StdFs,
     331            0 :                         remark,
     332            0 :                     }
     333              :                 }
     334              :             })
     335              :         }
     336           18 :     })
     337           18 :     .join()
     338           18 :     .unwrap()
     339           18 : }
         |