Line data Source code
1 : //! [`super::VirtualFile`] supports different IO engines.
2 : //!
3 : //! The [`IoEngineKind`] enum identifies them.
4 : //!
5 : //! The choice of IO engine is global.
6 : //! Initialize using [`init`].
7 : //!
8 : //! Then use [`get`] and [`super::OpenOptions`].
9 : //!
10 : //!
11 :
12 : #[cfg(target_os = "linux")]
13 : pub(super) mod tokio_epoll_uring_ext;
14 :
15 : use tokio_epoll_uring::IoBuf;
16 : use tracing::Instrument;
17 :
18 : pub(crate) use super::api::IoEngineKind;
19 : #[derive(Clone, Copy)]
20 : #[repr(u8)]
21 : pub(crate) enum IoEngine {
22 : NotSet,
23 : StdFs,
24 : #[cfg(target_os = "linux")]
25 : TokioEpollUring,
26 : }
27 :
28 : impl From<IoEngineKind> for IoEngine {
29 1464 : fn from(value: IoEngineKind) -> Self {
30 1464 : match value {
31 732 : IoEngineKind::StdFs => IoEngine::StdFs,
32 : #[cfg(target_os = "linux")]
33 732 : IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
34 : }
35 1464 : }
36 : }
37 :
38 : impl TryFrom<u8> for IoEngine {
39 : type Error = u8;
40 :
41 3599394 : fn try_from(value: u8) -> Result<Self, Self::Error> {
42 3599394 : Ok(match value {
43 3599394 : v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
44 3597930 : v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
45 : #[cfg(target_os = "linux")]
46 1799090 : v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
47 0 : x => return Err(x),
48 : })
49 3599394 : }
50 : }
51 :
52 : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
53 :
54 1464 : pub(crate) fn set(engine_kind: IoEngineKind) {
55 1464 : let engine: IoEngine = engine_kind.into();
56 1464 : IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
57 1464 : #[cfg(not(test))]
58 1464 : {
59 1464 : let metric = &crate::metrics::virtual_file_io_engine::KIND;
60 1464 : metric.reset();
61 1464 : metric
62 1464 : .with_label_values(&[&format!("{engine_kind}")])
63 1464 : .set(1);
64 1464 : }
65 1464 : }
66 :
67 : #[cfg(not(test))]
68 0 : pub(super) fn init(engine_kind: IoEngineKind) {
69 0 : set(engine_kind);
70 0 : }
71 :
72 : /// Longer-term, this API should only be used by [`super::VirtualFile`].
73 3599394 : pub(crate) fn get() -> IoEngine {
74 3599394 : let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
75 3599394 : if cfg!(test) {
76 3599394 : let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
77 3599394 : match cur {
78 : IoEngine::NotSet => {
79 1464 : let kind = match std::env::var(env_var_name) {
80 1464 : Ok(v) => match v.parse::<IoEngineKind>() {
81 1464 : Ok(engine_kind) => engine_kind,
82 0 : Err(e) => {
83 0 : panic!(
84 0 : "invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}"
85 0 : )
86 : }
87 : },
88 : Err(std::env::VarError::NotPresent) => {
89 : #[cfg(target_os = "linux")]
90 : {
91 0 : IoEngineKind::TokioEpollUring
92 : }
93 : #[cfg(not(target_os = "linux"))]
94 : {
95 : IoEngineKind::StdFs
96 : }
97 : }
98 : Err(std::env::VarError::NotUnicode(_)) => {
99 0 : panic!("env var {env_var_name} is not unicode");
100 : }
101 : };
102 1464 : self::set(kind);
103 1464 : self::get()
104 : }
105 3597930 : x => x,
106 : }
107 : } else {
108 0 : cur
109 : }
110 3599394 : }
111 :
112 : use std::os::unix::prelude::FileExt;
113 : use std::sync::atomic::{AtomicU8, Ordering};
114 :
115 : use super::owned_buffers_io::io_buf_ext::FullSlice;
116 : use super::owned_buffers_io::slice::SliceMutExt;
117 : use super::{FileGuard, Metadata};
118 :
119 : #[cfg(target_os = "linux")]
120 12 : fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
121 12 : match e {
122 12 : tokio_epoll_uring::Error::Op(e) => e,
123 0 : tokio_epoll_uring::Error::System(system) => {
124 0 : std::io::Error::new(std::io::ErrorKind::Other, system)
125 : }
126 : }
127 12 : }
128 :
129 : impl IoEngine {
130 3288427 : pub(super) async fn read_at<Buf>(
131 3288427 : &self,
132 3288427 : file_guard: FileGuard,
133 3288427 : offset: u64,
134 3288427 : mut slice: tokio_epoll_uring::Slice<Buf>,
135 3288427 : ) -> (
136 3288427 : (FileGuard, tokio_epoll_uring::Slice<Buf>),
137 3288427 : std::io::Result<usize>,
138 3288427 : )
139 3288427 : where
140 3288427 : Buf: tokio_epoll_uring::IoBufMut + Send,
141 3288427 : {
142 3288427 : match self {
143 0 : IoEngine::NotSet => panic!("not initialized"),
144 : IoEngine::StdFs => {
145 1644086 : let rust_slice = slice.as_mut_rust_slice_full_zeroed();
146 1644086 : let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
147 1644086 : ((file_guard, slice), res)
148 : }
149 : #[cfg(target_os = "linux")]
150 : IoEngine::TokioEpollUring => {
151 1644341 : let system = tokio_epoll_uring_ext::thread_local_system().await;
152 1644341 : let (resources, res) = system.read(file_guard, offset, slice).await;
153 1644341 : (resources, res.map_err(epoll_uring_error_to_std))
154 : }
155 : }
156 3288427 : }
157 17124 : pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
158 17124 : match self {
159 0 : IoEngine::NotSet => panic!("not initialized"),
160 : IoEngine::StdFs => {
161 8562 : let res = file_guard.with_std_file(|std_file| std_file.sync_all());
162 8562 : (file_guard, res)
163 : }
164 : #[cfg(target_os = "linux")]
165 : IoEngine::TokioEpollUring => {
166 8562 : let system = tokio_epoll_uring_ext::thread_local_system().await;
167 8562 : let (resources, res) = system.fsync(file_guard).await;
168 8562 : (resources, res.map_err(epoll_uring_error_to_std))
169 : }
170 : }
171 17124 : }
172 0 : pub(super) async fn sync_data(
173 0 : &self,
174 0 : file_guard: FileGuard,
175 0 : ) -> (FileGuard, std::io::Result<()>) {
176 0 : match self {
177 0 : IoEngine::NotSet => panic!("not initialized"),
178 : IoEngine::StdFs => {
179 0 : let res = file_guard.with_std_file(|std_file| std_file.sync_data());
180 0 : (file_guard, res)
181 : }
182 : #[cfg(target_os = "linux")]
183 : IoEngine::TokioEpollUring => {
184 0 : let system = tokio_epoll_uring_ext::thread_local_system().await;
185 0 : let (resources, res) = system.fdatasync(file_guard).await;
186 0 : (resources, res.map_err(epoll_uring_error_to_std))
187 : }
188 : }
189 0 : }
190 10992 : pub(super) async fn metadata(
191 10992 : &self,
192 10992 : file_guard: FileGuard,
193 10992 : ) -> (FileGuard, std::io::Result<Metadata>) {
194 10992 : match self {
195 0 : IoEngine::NotSet => panic!("not initialized"),
196 : IoEngine::StdFs => {
197 5496 : let res =
198 5496 : file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
199 5496 : (file_guard, res)
200 : }
201 : #[cfg(target_os = "linux")]
202 : IoEngine::TokioEpollUring => {
203 5496 : let system = tokio_epoll_uring_ext::thread_local_system().await;
204 5496 : let (resources, res) = system.statx(file_guard).await;
205 5496 : (
206 5496 : resources,
207 5496 : res.map_err(epoll_uring_error_to_std).map(Metadata::from),
208 5496 : )
209 : }
210 : }
211 10992 : }
212 :
213 84 : pub(super) async fn set_len(
214 84 : &self,
215 84 : file_guard: FileGuard,
216 84 : len: u64,
217 84 : ) -> (FileGuard, std::io::Result<()>) {
218 84 : match self {
219 0 : IoEngine::NotSet => panic!("not initialized"),
220 : IoEngine::StdFs => {
221 42 : let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
222 42 : (file_guard, res)
223 : }
224 : #[cfg(target_os = "linux")]
225 : IoEngine::TokioEpollUring => {
226 : // TODO: ftruncate op for tokio-epoll-uring
227 42 : let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
228 42 : (file_guard, res)
229 : }
230 : }
231 84 : }
232 :
233 237203 : pub(super) async fn write_at<B: IoBuf + Send>(
234 237203 : &self,
235 237203 : file_guard: FileGuard,
236 237203 : offset: u64,
237 237203 : buf: FullSlice<B>,
238 237203 : ) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
239 237203 : match self {
240 0 : IoEngine::NotSet => panic!("not initialized"),
241 : IoEngine::StdFs => {
242 118604 : let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
243 118604 : ((file_guard, buf), result)
244 : }
245 : #[cfg(target_os = "linux")]
246 : IoEngine::TokioEpollUring => {
247 118599 : let system = tokio_epoll_uring_ext::thread_local_system().await;
248 118599 : let ((file_guard, slice), res) =
249 118599 : system.write(file_guard, offset, buf.into_raw_slice()).await;
250 118599 : (
251 118599 : (file_guard, FullSlice::must_new(slice)),
252 118599 : res.map_err(epoll_uring_error_to_std),
253 118599 : )
254 : }
255 : }
256 237203 : }
257 :
258 : /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
259 : /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
260 : /// whereas before the switch to [`super::io_engine`], that wasn't the case.
261 : /// This method helps avoid such a regression.
262 : ///
263 : /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
264 84 : pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
265 84 : where
266 84 : Fut: 'static + Send + std::future::Future<Output = R>,
267 84 : R: 'static + Send,
268 84 : {
269 84 : match self {
270 0 : IoEngine::NotSet => panic!("not initialized"),
271 : IoEngine::StdFs => {
272 42 : let span = tracing::info_span!("spawn_blocking_block_on_if_std");
273 42 : tokio::task::spawn_blocking({
274 42 : move || tokio::runtime::Handle::current().block_on(work.instrument(span))
275 42 : })
276 42 : .await
277 42 : .expect("failed to join blocking code most likely it panicked, panicking as well")
278 : }
279 : #[cfg(target_os = "linux")]
280 42 : IoEngine::TokioEpollUring => work.await,
281 : }
282 84 : }
283 : }
284 :
285 : pub enum FeatureTestResult {
286 : PlatformPreferred(IoEngineKind),
287 : Worse {
288 : engine: IoEngineKind,
289 : remark: String,
290 : },
291 : }
292 :
293 : impl FeatureTestResult {
294 : #[cfg(target_os = "linux")]
295 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
296 : #[cfg(not(target_os = "linux"))]
297 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
298 : }
299 :
300 : impl From<FeatureTestResult> for IoEngineKind {
301 0 : fn from(val: FeatureTestResult) -> Self {
302 0 : match val {
303 0 : FeatureTestResult::PlatformPreferred(e) => e,
304 0 : FeatureTestResult::Worse { engine, .. } => engine,
305 : }
306 0 : }
307 : }
308 :
309 : /// Somewhat costly under the hood, do only once.
310 : /// Panics if we can't set up the feature test.
311 1500 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
312 1500 : std::thread::spawn(|| {
313 1500 :
314 1500 : #[cfg(not(target_os = "linux"))]
315 1500 : {
316 1500 : Ok(FeatureTestResult::PlatformPreferred(
317 1500 : FeatureTestResult::PLATFORM_PREFERRED,
318 1500 : ))
319 1500 : }
320 1500 : #[cfg(target_os = "linux")]
321 1500 : {
322 1500 : let rt = tokio::runtime::Builder::new_current_thread()
323 1500 : .enable_all()
324 1500 : .build()
325 1500 : .unwrap();
326 1500 : Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
327 : Ok(_) => FeatureTestResult::PlatformPreferred({
328 1500 : assert!(matches!(
329 1500 : IoEngineKind::TokioEpollUring,
330 : FeatureTestResult::PLATFORM_PREFERRED
331 : ));
332 1500 : FeatureTestResult::PLATFORM_PREFERRED
333 : }),
334 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
335 0 : let remark = match e.raw_os_error() {
336 : Some(nix::libc::EPERM) => {
337 : // fall back
338 0 : "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
339 0 : .to_string()
340 : }
341 : Some(nix::libc::EFAULT) => {
342 : // fail feature test
343 0 : anyhow::bail!(
344 0 : "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
345 0 : );
346 : }
347 : Some(_) | None => {
348 : // fall back
349 0 : format!("creating tokio-epoll-uring fails with error: {e:#}")
350 : }
351 : };
352 0 : FeatureTestResult::Worse {
353 0 : engine: IoEngineKind::StdFs,
354 0 : remark,
355 0 : }
356 : }
357 : })
358 : }
359 1500 : })
360 1500 : .join()
361 1500 : .unwrap()
362 1500 : }
363 :
364 : /// For use in benchmark binaries only.
365 : ///
366 : /// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
367 : /// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
368 : /// developer time trying to figure out why it's slow.
369 : ///
370 : /// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
371 0 : pub fn io_engine_for_bench() -> IoEngineKind {
372 0 : #[cfg(not(target_os = "linux"))]
373 0 : {
374 0 : panic!("This benchmark does I/O and can only give a representative result on Linux");
375 0 : }
376 0 : #[cfg(target_os = "linux")]
377 0 : {
378 0 : match feature_test().unwrap() {
379 0 : FeatureTestResult::PlatformPreferred(engine) => engine,
380 : FeatureTestResult::Worse {
381 0 : engine: _engine,
382 0 : remark,
383 0 : } => {
384 0 : panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
385 : }
386 : }
387 : }
388 0 : }
|