Line data Source code
1 : //! [`super::VirtualFile`] supports different IO engines.
2 : //!
3 : //! The [`IoEngineKind`] enum identifies them.
4 : //!
5 : //! The choice of IO engine is global.
6 : //! Initialize using [`init`].
7 : //!
8 : //! Then use [`get`] and [`super::OpenOptions`].
9 : //!
10 : //!
11 :
12 : #[cfg(target_os = "linux")]
13 : pub(super) mod tokio_epoll_uring_ext;
14 :
15 : use tokio_epoll_uring::IoBuf;
16 : use tracing::Instrument;
17 :
18 : pub(crate) use super::api::IoEngineKind;
19 : #[derive(Clone, Copy)]
20 : #[repr(u8)]
21 : pub(crate) enum IoEngine {
22 : NotSet,
23 : StdFs,
24 : #[cfg(target_os = "linux")]
25 : TokioEpollUring,
26 : }
27 :
28 : impl From<IoEngineKind> for IoEngine {
29 606 : fn from(value: IoEngineKind) -> Self {
30 606 : match value {
31 303 : IoEngineKind::StdFs => IoEngine::StdFs,
32 : #[cfg(target_os = "linux")]
33 303 : IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
34 : }
35 606 : }
36 : }
37 :
38 : impl TryFrom<u8> for IoEngine {
39 : type Error = u8;
40 :
41 5728650 : fn try_from(value: u8) -> Result<Self, Self::Error> {
42 5728650 : Ok(match value {
43 5728650 : v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
44 5728044 : v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
45 : #[cfg(target_os = "linux")]
46 2864045 : v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
47 0 : x => return Err(x),
48 : })
49 5728650 : }
50 : }
51 :
52 : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
53 :
54 606 : pub(crate) fn set(engine_kind: IoEngineKind) {
55 606 : let engine: IoEngine = engine_kind.into();
56 606 : IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
57 606 : #[cfg(not(test))]
58 606 : {
59 606 : let metric = &crate::metrics::virtual_file_io_engine::KIND;
60 606 : metric.reset();
61 606 : metric
62 606 : .with_label_values(&[&format!("{engine_kind}")])
63 606 : .set(1);
64 606 : }
65 606 : }
66 :
67 : #[cfg(not(test))]
68 0 : pub(super) fn init(engine_kind: IoEngineKind) {
69 0 : set(engine_kind);
70 0 : }
71 :
72 : /// Longer-term, this API should only be used by [`super::VirtualFile`].
73 5728650 : pub(crate) fn get() -> IoEngine {
74 5728650 : let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
75 5728650 : if cfg!(test) {
76 5728650 : let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
77 5728650 : match cur {
78 : IoEngine::NotSet => {
79 606 : let kind = match std::env::var(env_var_name) {
80 606 : Ok(v) => match v.parse::<IoEngineKind>() {
81 606 : Ok(engine_kind) => engine_kind,
82 0 : Err(e) => {
83 0 : panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
84 : }
85 : },
86 : Err(std::env::VarError::NotPresent) => {
87 : #[cfg(target_os = "linux")]
88 : {
89 0 : IoEngineKind::TokioEpollUring
90 : }
91 : #[cfg(not(target_os = "linux"))]
92 : {
93 : IoEngineKind::StdFs
94 : }
95 : }
96 : Err(std::env::VarError::NotUnicode(_)) => {
97 0 : panic!("env var {env_var_name} is not unicode");
98 : }
99 : };
100 606 : self::set(kind);
101 606 : self::get()
102 : }
103 5728044 : x => x,
104 : }
105 : } else {
106 0 : cur
107 : }
108 5728650 : }
109 :
110 : use std::{
111 : os::unix::prelude::FileExt,
112 : sync::atomic::{AtomicU8, Ordering},
113 : };
114 :
115 : use super::{
116 : owned_buffers_io::{io_buf_ext::FullSlice, slice::SliceMutExt},
117 : FileGuard, Metadata,
118 : };
119 :
120 : #[cfg(target_os = "linux")]
121 6 : fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
122 6 : match e {
123 6 : tokio_epoll_uring::Error::Op(e) => e,
124 0 : tokio_epoll_uring::Error::System(system) => {
125 0 : std::io::Error::new(std::io::ErrorKind::Other, system)
126 : }
127 : }
128 6 : }
129 :
130 : impl IoEngine {
131 2286202 : pub(super) async fn read_at<Buf>(
132 2286202 : &self,
133 2286202 : file_guard: FileGuard,
134 2286202 : offset: u64,
135 2286202 : mut slice: tokio_epoll_uring::Slice<Buf>,
136 2286202 : ) -> (
137 2286202 : (FileGuard, tokio_epoll_uring::Slice<Buf>),
138 2286202 : std::io::Result<usize>,
139 2286202 : )
140 2286202 : where
141 2286202 : Buf: tokio_epoll_uring::IoBufMut + Send,
142 2286202 : {
143 2286202 : match self {
144 0 : IoEngine::NotSet => panic!("not initialized"),
145 : IoEngine::StdFs => {
146 1143080 : let rust_slice = slice.as_mut_rust_slice_full_zeroed();
147 1143080 : let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
148 1143080 : ((file_guard, slice), res)
149 : }
150 : #[cfg(target_os = "linux")]
151 : IoEngine::TokioEpollUring => {
152 1143122 : let system = tokio_epoll_uring_ext::thread_local_system().await;
153 1143188 : let (resources, res) = system.read(file_guard, offset, slice).await;
154 1143122 : (resources, res.map_err(epoll_uring_error_to_std))
155 : }
156 : }
157 2286202 : }
158 8139 : pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
159 8139 : match self {
160 0 : IoEngine::NotSet => panic!("not initialized"),
161 : IoEngine::StdFs => {
162 4065 : let res = file_guard.with_std_file(|std_file| std_file.sync_all());
163 4065 : (file_guard, res)
164 : }
165 : #[cfg(target_os = "linux")]
166 : IoEngine::TokioEpollUring => {
167 4074 : let system = tokio_epoll_uring_ext::thread_local_system().await;
168 4074 : let (resources, res) = system.fsync(file_guard).await;
169 4074 : (resources, res.map_err(epoll_uring_error_to_std))
170 : }
171 : }
172 8139 : }
173 0 : pub(super) async fn sync_data(
174 0 : &self,
175 0 : file_guard: FileGuard,
176 0 : ) -> (FileGuard, std::io::Result<()>) {
177 0 : match self {
178 0 : IoEngine::NotSet => panic!("not initialized"),
179 : IoEngine::StdFs => {
180 0 : let res = file_guard.with_std_file(|std_file| std_file.sync_data());
181 0 : (file_guard, res)
182 : }
183 : #[cfg(target_os = "linux")]
184 : IoEngine::TokioEpollUring => {
185 0 : let system = tokio_epoll_uring_ext::thread_local_system().await;
186 0 : let (resources, res) = system.fdatasync(file_guard).await;
187 0 : (resources, res.map_err(epoll_uring_error_to_std))
188 : }
189 : }
190 0 : }
191 5130 : pub(super) async fn metadata(
192 5130 : &self,
193 5130 : file_guard: FileGuard,
194 5130 : ) -> (FileGuard, std::io::Result<Metadata>) {
195 5130 : match self {
196 0 : IoEngine::NotSet => panic!("not initialized"),
197 : IoEngine::StdFs => {
198 2565 : let res =
199 2565 : file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
200 2565 : (file_guard, res)
201 : }
202 : #[cfg(target_os = "linux")]
203 : IoEngine::TokioEpollUring => {
204 2565 : let system = tokio_epoll_uring_ext::thread_local_system().await;
205 2565 : let (resources, res) = system.statx(file_guard).await;
206 2565 : (
207 2565 : resources,
208 2565 : res.map_err(epoll_uring_error_to_std).map(Metadata::from),
209 2565 : )
210 : }
211 : }
212 5130 : }
213 3407498 : pub(super) async fn write_at<B: IoBuf + Send>(
214 3407498 : &self,
215 3407498 : file_guard: FileGuard,
216 3407498 : offset: u64,
217 3407498 : buf: FullSlice<B>,
218 3407498 : ) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
219 3407498 : match self {
220 0 : IoEngine::NotSet => panic!("not initialized"),
221 : IoEngine::StdFs => {
222 1703756 : let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
223 1703756 : ((file_guard, buf), result)
224 : }
225 : #[cfg(target_os = "linux")]
226 : IoEngine::TokioEpollUring => {
227 1703742 : let system = tokio_epoll_uring_ext::thread_local_system().await;
228 1703742 : let ((file_guard, slice), res) =
229 1703748 : system.write(file_guard, offset, buf.into_raw_slice()).await;
230 1703742 : (
231 1703742 : (file_guard, FullSlice::must_new(slice)),
232 1703742 : res.map_err(epoll_uring_error_to_std),
233 1703742 : )
234 : }
235 : }
236 3407498 : }
237 :
238 : /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
239 : /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
240 : /// whereas before the switch to [`super::io_engine`], that wasn't the case.
241 : /// This method helps avoid such a regression.
242 : ///
243 : /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
244 18 : pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
245 18 : where
246 18 : Fut: 'static + Send + std::future::Future<Output = R>,
247 18 : R: 'static + Send,
248 18 : {
249 18 : match self {
250 0 : IoEngine::NotSet => panic!("not initialized"),
251 : IoEngine::StdFs => {
252 9 : let span = tracing::info_span!("spawn_blocking_block_on_if_std");
253 9 : tokio::task::spawn_blocking({
254 9 : move || tokio::runtime::Handle::current().block_on(work.instrument(span))
255 9 : })
256 9 : .await
257 9 : .expect("failed to join blocking code most likely it panicked, panicking as well")
258 : }
259 : #[cfg(target_os = "linux")]
260 18 : IoEngine::TokioEpollUring => work.await,
261 : }
262 18 : }
263 : }
264 :
265 : pub enum FeatureTestResult {
266 : PlatformPreferred(IoEngineKind),
267 : Worse {
268 : engine: IoEngineKind,
269 : remark: String,
270 : },
271 : }
272 :
273 : impl FeatureTestResult {
274 : #[cfg(target_os = "linux")]
275 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
276 : #[cfg(not(target_os = "linux"))]
277 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
278 : }
279 :
280 : impl From<FeatureTestResult> for IoEngineKind {
281 0 : fn from(val: FeatureTestResult) -> Self {
282 0 : match val {
283 0 : FeatureTestResult::PlatformPreferred(e) => e,
284 0 : FeatureTestResult::Worse { engine, .. } => engine,
285 : }
286 0 : }
287 : }
288 :
289 : /// Somewhat costly under the hood, do only once.
290 : /// Panics if we can't set up the feature test.
291 624 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
292 624 : std::thread::spawn(|| {
293 624 :
294 624 : #[cfg(not(target_os = "linux"))]
295 624 : {
296 624 : Ok(FeatureTestResult::PlatformPreferred(
297 624 : FeatureTestResult::PLATFORM_PREFERRED,
298 624 : ))
299 624 : }
300 624 : #[cfg(target_os = "linux")]
301 624 : {
302 624 : let rt = tokio::runtime::Builder::new_current_thread()
303 624 : .enable_all()
304 624 : .build()
305 624 : .unwrap();
306 624 : Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
307 : Ok(_) => FeatureTestResult::PlatformPreferred({
308 624 : assert!(matches!(
309 624 : IoEngineKind::TokioEpollUring,
310 : FeatureTestResult::PLATFORM_PREFERRED
311 : ));
312 624 : FeatureTestResult::PLATFORM_PREFERRED
313 : }),
314 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
315 0 : let remark = match e.raw_os_error() {
316 : Some(nix::libc::EPERM) => {
317 : // fall back
318 0 : "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
319 0 : .to_string()
320 : }
321 : Some(nix::libc::EFAULT) => {
322 : // fail feature test
323 0 : anyhow::bail!(
324 0 : "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
325 0 : );
326 : }
327 : Some(_) | None => {
328 : // fall back
329 0 : format!("creating tokio-epoll-uring fails with error: {e:#}")
330 : }
331 : };
332 0 : FeatureTestResult::Worse {
333 0 : engine: IoEngineKind::StdFs,
334 0 : remark,
335 0 : }
336 : }
337 : })
338 : }
339 624 : })
340 624 : .join()
341 624 : .unwrap()
342 624 : }
343 :
344 : /// For use in benchmark binaries only.
345 : ///
346 : /// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
347 : /// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
348 : /// developer time trying to figure out why it's slow.
349 : ///
350 : /// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
351 0 : pub fn io_engine_for_bench() -> IoEngineKind {
352 0 : #[cfg(not(target_os = "linux"))]
353 0 : {
354 0 : panic!("This benchmark does I/O and can only give a representative result on Linux");
355 0 : }
356 0 : #[cfg(target_os = "linux")]
357 0 : {
358 0 : match feature_test().unwrap() {
359 0 : FeatureTestResult::PlatformPreferred(engine) => engine,
360 : FeatureTestResult::Worse {
361 0 : engine: _engine,
362 0 : remark,
363 0 : } => {
364 0 : panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
365 : }
366 : }
367 : }
368 0 : }
|