Line data Source code
1 : //! [`super::VirtualFile`] supports different IO engines.
2 : //!
3 : //! The [`IoEngineKind`] enum identifies them.
4 : //!
5 : //! The choice of IO engine is global.
6 : //! Initialize using [`init`].
7 : //!
8 : //! Then use [`get`] and [`super::OpenOptions`].
9 : //!
10 : //!
11 :
12 : #[cfg(target_os = "linux")]
13 : pub(super) mod tokio_epoll_uring_ext;
14 :
15 : use tokio_epoll_uring::{IoBuf, Slice};
16 : use tracing::Instrument;
17 :
18 : pub(crate) use super::api::IoEngineKind;
19 : #[derive(Clone, Copy)]
20 : #[repr(u8)]
21 : pub(crate) enum IoEngine {
22 : NotSet,
23 : StdFs,
24 : #[cfg(target_os = "linux")]
25 : TokioEpollUring,
26 : }
27 :
28 : impl From<IoEngineKind> for IoEngine {
29 158 : fn from(value: IoEngineKind) -> Self {
30 158 : match value {
31 79 : IoEngineKind::StdFs => IoEngine::StdFs,
32 : #[cfg(target_os = "linux")]
33 79 : IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
34 : }
35 158 : }
36 : }
37 :
38 : impl TryFrom<u8> for IoEngine {
39 : type Error = u8;
40 :
41 1496945 : fn try_from(value: u8) -> Result<Self, Self::Error> {
42 1496945 : Ok(match value {
43 1496945 : v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
44 1496787 : v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
45 : #[cfg(target_os = "linux")]
46 748460 : v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
47 0 : x => return Err(x),
48 : })
49 1496945 : }
50 : }
51 :
52 : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
53 :
54 158 : pub(crate) fn set(engine_kind: IoEngineKind) {
55 158 : let engine: IoEngine = engine_kind.into();
56 158 : IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
57 158 : #[cfg(not(test))]
58 158 : {
59 158 : let metric = &crate::metrics::virtual_file_io_engine::KIND;
60 158 : metric.reset();
61 158 : metric
62 158 : .with_label_values(&[&format!("{engine_kind}")])
63 158 : .set(1);
64 158 : }
65 158 : }
66 :
67 : #[cfg(not(test))]
68 0 : pub(super) fn init(engine_kind: IoEngineKind) {
69 0 : set(engine_kind);
70 0 : }
71 :
72 : /// Longer-term, this API should only be used by [`super::VirtualFile`].
73 1496945 : pub(crate) fn get() -> IoEngine {
74 1496945 : let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
75 1496945 : if cfg!(test) {
76 1496945 : let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
77 1496945 : match cur {
78 : IoEngine::NotSet => {
79 158 : let kind = match std::env::var(env_var_name) {
80 158 : Ok(v) => match v.parse::<IoEngineKind>() {
81 158 : Ok(engine_kind) => engine_kind,
82 0 : Err(e) => {
83 0 : panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
84 : }
85 : },
86 : Err(std::env::VarError::NotPresent) => {
87 0 : crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
88 0 : .parse()
89 0 : .unwrap()
90 : }
91 : Err(std::env::VarError::NotUnicode(_)) => {
92 0 : panic!("env var {env_var_name} is not unicode");
93 : }
94 : };
95 158 : self::set(kind);
96 158 : self::get()
97 : }
98 1496787 : x => x,
99 : }
100 : } else {
101 0 : cur
102 : }
103 1496945 : }
104 :
105 : use std::{
106 : os::unix::prelude::FileExt,
107 : sync::atomic::{AtomicU8, Ordering},
108 : };
109 :
110 : use super::{FileGuard, Metadata};
111 :
112 : #[cfg(target_os = "linux")]
113 2 : fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
114 2 : match e {
115 2 : tokio_epoll_uring::Error::Op(e) => e,
116 0 : tokio_epoll_uring::Error::System(system) => {
117 0 : std::io::Error::new(std::io::ErrorKind::Other, system)
118 : }
119 : }
120 2 : }
121 :
122 : impl IoEngine {
123 373551 : pub(super) async fn read_at<B>(
124 373551 : &self,
125 373551 : file_guard: FileGuard,
126 373551 : offset: u64,
127 373551 : mut buf: B,
128 373551 : ) -> ((FileGuard, B), std::io::Result<usize>)
129 373551 : where
130 373551 : B: tokio_epoll_uring::BoundedBufMut + Send,
131 373551 : {
132 373551 : match self {
133 0 : IoEngine::NotSet => panic!("not initialized"),
134 : IoEngine::StdFs => {
135 : // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
136 186712 : let dst = unsafe {
137 186712 : std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
138 186712 : };
139 186712 : let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
140 186712 : if let Ok(nbytes) = &res {
141 186711 : assert!(*nbytes <= buf.bytes_total());
142 : // SAFETY: see above assertion
143 186711 : unsafe {
144 186711 : buf.set_init(*nbytes);
145 186711 : }
146 1 : }
147 : #[allow(dropping_references)]
148 186712 : drop(dst);
149 186712 : ((file_guard, buf), res)
150 : }
151 : #[cfg(target_os = "linux")]
152 : IoEngine::TokioEpollUring => {
153 186839 : let system = tokio_epoll_uring_ext::thread_local_system().await;
154 186875 : let (resources, res) = system.read(file_guard, offset, buf).await;
155 186839 : (resources, res.map_err(epoll_uring_error_to_std))
156 : }
157 : }
158 373551 : }
159 2703 : pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
160 2703 : match self {
161 0 : IoEngine::NotSet => panic!("not initialized"),
162 : IoEngine::StdFs => {
163 1350 : let res = file_guard.with_std_file(|std_file| std_file.sync_all());
164 1350 : (file_guard, res)
165 : }
166 : #[cfg(target_os = "linux")]
167 : IoEngine::TokioEpollUring => {
168 1353 : let system = tokio_epoll_uring_ext::thread_local_system().await;
169 1354 : let (resources, res) = system.fsync(file_guard).await;
170 1353 : (resources, res.map_err(epoll_uring_error_to_std))
171 : }
172 : }
173 2703 : }
174 0 : pub(super) async fn sync_data(
175 0 : &self,
176 0 : file_guard: FileGuard,
177 0 : ) -> (FileGuard, std::io::Result<()>) {
178 0 : match self {
179 0 : IoEngine::NotSet => panic!("not initialized"),
180 : IoEngine::StdFs => {
181 0 : let res = file_guard.with_std_file(|std_file| std_file.sync_data());
182 0 : (file_guard, res)
183 : }
184 : #[cfg(target_os = "linux")]
185 : IoEngine::TokioEpollUring => {
186 0 : let system = tokio_epoll_uring_ext::thread_local_system().await;
187 0 : let (resources, res) = system.fdatasync(file_guard).await;
188 0 : (resources, res.map_err(epoll_uring_error_to_std))
189 : }
190 : }
191 0 : }
192 1546 : pub(super) async fn metadata(
193 1546 : &self,
194 1546 : file_guard: FileGuard,
195 1546 : ) -> (FileGuard, std::io::Result<Metadata>) {
196 1546 : match self {
197 0 : IoEngine::NotSet => panic!("not initialized"),
198 : IoEngine::StdFs => {
199 773 : let res =
200 773 : file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
201 773 : (file_guard, res)
202 : }
203 : #[cfg(target_os = "linux")]
204 : IoEngine::TokioEpollUring => {
205 773 : let system = tokio_epoll_uring_ext::thread_local_system().await;
206 773 : let (resources, res) = system.statx(file_guard).await;
207 773 : (
208 773 : resources,
209 773 : res.map_err(epoll_uring_error_to_std).map(Metadata::from),
210 773 : )
211 : }
212 : }
213 1546 : }
214 1112184 : pub(super) async fn write_at<B: IoBuf + Send>(
215 1112184 : &self,
216 1112184 : file_guard: FileGuard,
217 1112184 : offset: u64,
218 1112184 : buf: Slice<B>,
219 1112184 : ) -> ((FileGuard, Slice<B>), std::io::Result<usize>) {
220 1112184 : match self {
221 0 : IoEngine::NotSet => panic!("not initialized"),
222 : IoEngine::StdFs => {
223 556092 : let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
224 556092 : ((file_guard, buf), result)
225 : }
226 : #[cfg(target_os = "linux")]
227 : IoEngine::TokioEpollUring => {
228 556092 : let system = tokio_epoll_uring_ext::thread_local_system().await;
229 556111 : let (resources, res) = system.write(file_guard, offset, buf).await;
230 556092 : (resources, res.map_err(epoll_uring_error_to_std))
231 : }
232 : }
233 1112184 : }
234 :
235 : /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
236 : /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
237 : /// whereas before the switch to [`super::io_engine`], that wasn't the case.
238 : /// This method helps avoid such a regression.
239 : ///
240 : /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
241 6 : pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
242 6 : where
243 6 : Fut: 'static + Send + std::future::Future<Output = R>,
244 6 : R: 'static + Send,
245 6 : {
246 6 : match self {
247 0 : IoEngine::NotSet => panic!("not initialized"),
248 : IoEngine::StdFs => {
249 3 : let span = tracing::info_span!("spawn_blocking_block_on_if_std");
250 3 : tokio::task::spawn_blocking({
251 3 : move || tokio::runtime::Handle::current().block_on(work.instrument(span))
252 3 : })
253 3 : .await
254 3 : .expect("failed to join blocking code most likely it panicked, panicking as well")
255 : }
256 : #[cfg(target_os = "linux")]
257 6 : IoEngine::TokioEpollUring => work.await,
258 : }
259 6 : }
260 : }
261 :
262 : pub enum FeatureTestResult {
263 : PlatformPreferred(IoEngineKind),
264 : Worse {
265 : engine: IoEngineKind,
266 : remark: String,
267 : },
268 : }
269 :
270 : impl FeatureTestResult {
271 : #[cfg(target_os = "linux")]
272 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
273 : #[cfg(not(target_os = "linux"))]
274 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
275 : }
276 :
277 : impl From<FeatureTestResult> for IoEngineKind {
278 0 : fn from(val: FeatureTestResult) -> Self {
279 0 : match val {
280 0 : FeatureTestResult::PlatformPreferred(e) => e,
281 0 : FeatureTestResult::Worse { engine, .. } => engine,
282 : }
283 0 : }
284 : }
285 :
286 : /// Somewhat costly under the hood, do only once.
287 : /// Panics if we can't set up the feature test.
288 18 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
289 18 : std::thread::spawn(|| {
290 18 :
291 18 : #[cfg(not(target_os = "linux"))]
292 18 : {
293 18 : Ok(FeatureTestResult::PlatformPreferred(
294 18 : FeatureTestResult::PLATFORM_PREFERRED,
295 18 : ))
296 18 : }
297 18 : #[cfg(target_os = "linux")]
298 18 : {
299 18 : let rt = tokio::runtime::Builder::new_current_thread()
300 18 : .enable_all()
301 18 : .build()
302 18 : .unwrap();
303 18 : Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
304 : Ok(_) => FeatureTestResult::PlatformPreferred({
305 18 : assert!(matches!(
306 18 : IoEngineKind::TokioEpollUring,
307 : FeatureTestResult::PLATFORM_PREFERRED
308 : ));
309 18 : FeatureTestResult::PLATFORM_PREFERRED
310 : }),
311 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
312 0 : let remark = match e.raw_os_error() {
313 : Some(nix::libc::EPERM) => {
314 : // fall back
315 0 : "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
316 0 : .to_string()
317 : }
318 : Some(nix::libc::EFAULT) => {
319 : // fail feature test
320 0 : anyhow::bail!(
321 0 : "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
322 0 : );
323 : }
324 : Some(_) | None => {
325 : // fall back
326 0 : format!("creating tokio-epoll-uring fails with error: {e:#}")
327 : }
328 : };
329 0 : FeatureTestResult::Worse {
330 0 : engine: IoEngineKind::StdFs,
331 0 : remark,
332 0 : }
333 : }
334 : })
335 : }
336 18 : })
337 18 : .join()
338 18 : .unwrap()
339 18 : }
|