Line data Source code
1 : //! [`super::VirtualFile`] supports different IO engines.
2 : //!
3 : //! The [`IoEngineKind`] enum identifies them.
4 : //!
5 : //! The choice of IO engine is global.
6 : //! Initialize using [`init`].
7 : //!
8 : //! Then use [`get`] and [`super::OpenOptions`].
9 :
10 : pub(crate) use super::api::IoEngineKind;
11 0 : #[derive(Clone, Copy)]
12 : #[repr(u8)]
13 : pub(crate) enum IoEngine {
14 : NotSet,
15 : StdFs,
16 : #[cfg(target_os = "linux")]
17 : TokioEpollUring,
18 : }
19 :
20 : impl From<IoEngineKind> for IoEngine {
21 723 : fn from(value: IoEngineKind) -> Self {
22 723 : match value {
23 674 : IoEngineKind::StdFs => IoEngine::StdFs,
24 : #[cfg(target_os = "linux")]
25 49 : IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
26 : }
27 723 : }
28 : }
29 :
30 : impl TryFrom<u8> for IoEngine {
31 : type Error = u8;
32 :
33 5208058 : fn try_from(value: u8) -> Result<Self, Self::Error> {
34 5208058 : Ok(match value {
35 5208058 : v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
36 5207960 : v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
37 : #[cfg(target_os = "linux")]
38 145705 : v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
39 0 : x => return Err(x),
40 : })
41 5208058 : }
42 : }
43 :
44 : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
45 :
46 723 : pub(crate) fn set(engine_kind: IoEngineKind) {
47 723 : let engine: IoEngine = engine_kind.into();
48 723 : IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
49 723 : #[cfg(not(test))]
50 723 : {
51 723 : let metric = &crate::metrics::virtual_file_io_engine::KIND;
52 723 : metric.reset();
53 723 : metric
54 723 : .with_label_values(&[&format!("{engine_kind}")])
55 723 : .set(1);
56 723 : }
57 723 : }
58 :
59 : #[cfg(not(test))]
60 625 : pub(super) fn init(engine_kind: IoEngineKind) {
61 625 : set(engine_kind);
62 625 : }
63 :
64 5208058 : pub(super) fn get() -> IoEngine {
65 5208058 : let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
66 5208058 : if cfg!(test) {
67 291665 : let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
68 291665 : match cur {
69 : IoEngine::NotSet => {
70 98 : let kind = match std::env::var(env_var_name) {
71 98 : Ok(v) => match v.parse::<IoEngineKind>() {
72 98 : Ok(engine_kind) => engine_kind,
73 0 : Err(e) => {
74 0 : panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
75 : }
76 : },
77 : Err(std::env::VarError::NotPresent) => {
78 0 : crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
79 0 : .parse()
80 0 : .unwrap()
81 : }
82 : Err(std::env::VarError::NotUnicode(_)) => {
83 0 : panic!("env var {env_var_name} is not unicode");
84 : }
85 : };
86 98 : self::set(kind);
87 98 : self::get()
88 : }
89 291567 : x => x,
90 : }
91 : } else {
92 4916393 : cur
93 : }
94 5208058 : }
95 :
96 : use std::{
97 : os::unix::prelude::FileExt,
98 : sync::atomic::{AtomicU8, Ordering},
99 : };
100 :
101 : use super::FileGuard;
102 :
103 : impl IoEngine {
104 5145996 : pub(super) async fn read_at<B>(
105 5145996 : &self,
106 5145996 : file_guard: FileGuard,
107 5145996 : offset: u64,
108 5145996 : mut buf: B,
109 5145996 : ) -> ((FileGuard, B), std::io::Result<usize>)
110 5145996 : where
111 5145996 : B: tokio_epoll_uring::BoundedBufMut + Send,
112 5145996 : {
113 5145996 : match self {
114 0 : IoEngine::NotSet => panic!("not initialized"),
115 : IoEngine::StdFs => {
116 : // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
117 5001285 : let dst = unsafe {
118 5001285 : std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
119 5001285 : };
120 5001285 : let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
121 5001285 : if let Ok(nbytes) = &res {
122 5001284 : assert!(*nbytes <= buf.bytes_total());
123 : // SAFETY: see above assertion
124 5001284 : unsafe {
125 5001284 : buf.set_init(*nbytes);
126 5001284 : }
127 1 : }
128 : #[allow(dropping_references)]
129 5001285 : drop(dst);
130 5001285 : ((file_guard, buf), res)
131 : }
132 : #[cfg(target_os = "linux")]
133 : IoEngine::TokioEpollUring => {
134 144711 : let system = tokio_epoll_uring::thread_local_system().await;
135 144726 : let (resources, res) = system.read(file_guard, offset, buf).await;
136 144711 : (
137 144711 : resources,
138 144711 : res.map_err(|e| match e {
139 1 : tokio_epoll_uring::Error::Op(e) => e,
140 0 : tokio_epoll_uring::Error::System(system) => {
141 0 : std::io::Error::new(std::io::ErrorKind::Other, system)
142 : }
143 144711 : }),
144 144711 : )
145 : }
146 : }
147 5145996 : }
148 : }
|