Line data Source code
1 : //! [`super::VirtualFile`] supports different IO engines.
2 : //!
3 : //! The [`IoEngineKind`] enum identifies them.
4 : //!
5 : //! The choice of IO engine is global.
6 : //! Initialize using [`init`].
7 : //!
8 : //! Then use [`get`] and [`super::OpenOptions`].
9 : //!
10 : //!
11 :
12 : #[cfg(target_os = "linux")]
13 : pub(super) mod tokio_epoll_uring_ext;
14 :
15 : use tokio_epoll_uring::IoBuf;
16 : use tracing::Instrument;
17 :
18 : pub(crate) use super::api::IoEngineKind;
19 : #[derive(Clone, Copy)]
20 : #[repr(u8)]
21 : pub(crate) enum IoEngine {
22 : NotSet,
23 : StdFs,
24 : #[cfg(target_os = "linux")]
25 : TokioEpollUring,
26 : }
27 :
28 : impl From<IoEngineKind> for IoEngine {
29 190 : fn from(value: IoEngineKind) -> Self {
30 190 : match value {
31 95 : IoEngineKind::StdFs => IoEngine::StdFs,
32 : #[cfg(target_os = "linux")]
33 95 : IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
34 : }
35 190 : }
36 : }
37 :
38 : impl TryFrom<u8> for IoEngine {
39 : type Error = u8;
40 :
41 1634899 : fn try_from(value: u8) -> Result<Self, Self::Error> {
42 1634899 : Ok(match value {
43 1634899 : v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
44 1634709 : v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
45 : #[cfg(target_os = "linux")]
46 817301 : v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
47 0 : x => return Err(x),
48 : })
49 1634899 : }
50 : }
51 :
52 : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
53 :
54 190 : pub(crate) fn set(engine_kind: IoEngineKind) {
55 190 : let engine: IoEngine = engine_kind.into();
56 190 : IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
57 190 : #[cfg(not(test))]
58 190 : {
59 190 : let metric = &crate::metrics::virtual_file_io_engine::KIND;
60 190 : metric.reset();
61 190 : metric
62 190 : .with_label_values(&[&format!("{engine_kind}")])
63 190 : .set(1);
64 190 : }
65 190 : }
66 :
67 : #[cfg(not(test))]
68 0 : pub(super) fn init(engine_kind: IoEngineKind) {
69 0 : set(engine_kind);
70 0 : }
71 :
72 : /// Longer-term, this API should only be used by [`super::VirtualFile`].
73 1634899 : pub(crate) fn get() -> IoEngine {
74 1634899 : let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
75 1634899 : if cfg!(test) {
76 1634899 : let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
77 1634899 : match cur {
78 : IoEngine::NotSet => {
79 190 : let kind = match std::env::var(env_var_name) {
80 190 : Ok(v) => match v.parse::<IoEngineKind>() {
81 190 : Ok(engine_kind) => engine_kind,
82 0 : Err(e) => {
83 0 : panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
84 : }
85 : },
86 : Err(std::env::VarError::NotPresent) => {
87 0 : crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
88 0 : .parse()
89 0 : .unwrap()
90 : }
91 : Err(std::env::VarError::NotUnicode(_)) => {
92 0 : panic!("env var {env_var_name} is not unicode");
93 : }
94 : };
95 190 : self::set(kind);
96 190 : self::get()
97 : }
98 1634709 : x => x,
99 : }
100 : } else {
101 0 : cur
102 : }
103 1634899 : }
104 :
105 : use std::{
106 : os::unix::prelude::FileExt,
107 : sync::atomic::{AtomicU8, Ordering},
108 : };
109 :
110 : use super::{
111 : owned_buffers_io::{io_buf_ext::FullSlice, slice::SliceMutExt},
112 : FileGuard, Metadata,
113 : };
114 :
115 : #[cfg(target_os = "linux")]
116 2 : fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
117 2 : match e {
118 2 : tokio_epoll_uring::Error::Op(e) => e,
119 0 : tokio_epoll_uring::Error::System(system) => {
120 0 : std::io::Error::new(std::io::ErrorKind::Other, system)
121 : }
122 : }
123 2 : }
124 :
125 : impl IoEngine {
126 496589 : pub(super) async fn read_at<Buf>(
127 496589 : &self,
128 496589 : file_guard: FileGuard,
129 496589 : offset: u64,
130 496589 : mut slice: tokio_epoll_uring::Slice<Buf>,
131 496589 : ) -> (
132 496589 : (FileGuard, tokio_epoll_uring::Slice<Buf>),
133 496589 : std::io::Result<usize>,
134 496589 : )
135 496589 : where
136 496589 : Buf: tokio_epoll_uring::IoBufMut + Send,
137 496589 : {
138 496589 : match self {
139 0 : IoEngine::NotSet => panic!("not initialized"),
140 : IoEngine::StdFs => {
141 248346 : let rust_slice = slice.as_mut_rust_slice_full_zeroed();
142 248346 : let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
143 248346 : ((file_guard, slice), res)
144 : }
145 : #[cfg(target_os = "linux")]
146 : IoEngine::TokioEpollUring => {
147 248243 : let system = tokio_epoll_uring_ext::thread_local_system().await;
148 248265 : let (resources, res) = system.read(file_guard, offset, slice).await;
149 248243 : (resources, res.map_err(epoll_uring_error_to_std))
150 : }
151 : }
152 496589 : }
153 2687 : pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
154 2687 : match self {
155 0 : IoEngine::NotSet => panic!("not initialized"),
156 : IoEngine::StdFs => {
157 1342 : let res = file_guard.with_std_file(|std_file| std_file.sync_all());
158 1342 : (file_guard, res)
159 : }
160 : #[cfg(target_os = "linux")]
161 : IoEngine::TokioEpollUring => {
162 1345 : let system = tokio_epoll_uring_ext::thread_local_system().await;
163 1345 : let (resources, res) = system.fsync(file_guard).await;
164 1345 : (resources, res.map_err(epoll_uring_error_to_std))
165 : }
166 : }
167 2687 : }
168 0 : pub(super) async fn sync_data(
169 0 : &self,
170 0 : file_guard: FileGuard,
171 0 : ) -> (FileGuard, std::io::Result<()>) {
172 0 : match self {
173 0 : IoEngine::NotSet => panic!("not initialized"),
174 : IoEngine::StdFs => {
175 0 : let res = file_guard.with_std_file(|std_file| std_file.sync_data());
176 0 : (file_guard, res)
177 : }
178 : #[cfg(target_os = "linux")]
179 : IoEngine::TokioEpollUring => {
180 0 : let system = tokio_epoll_uring_ext::thread_local_system().await;
181 0 : let (resources, res) = system.fdatasync(file_guard).await;
182 0 : (resources, res.map_err(epoll_uring_error_to_std))
183 : }
184 : }
185 0 : }
186 1684 : pub(super) async fn metadata(
187 1684 : &self,
188 1684 : file_guard: FileGuard,
189 1684 : ) -> (FileGuard, std::io::Result<Metadata>) {
190 1684 : match self {
191 0 : IoEngine::NotSet => panic!("not initialized"),
192 : IoEngine::StdFs => {
193 842 : let res =
194 842 : file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
195 842 : (file_guard, res)
196 : }
197 : #[cfg(target_os = "linux")]
198 : IoEngine::TokioEpollUring => {
199 842 : let system = tokio_epoll_uring_ext::thread_local_system().await;
200 842 : let (resources, res) = system.statx(file_guard).await;
201 842 : (
202 842 : resources,
203 842 : res.map_err(epoll_uring_error_to_std).map(Metadata::from),
204 842 : )
205 : }
206 : }
207 1684 : }
208 1126780 : pub(super) async fn write_at<B: IoBuf + Send>(
209 1126780 : &self,
210 1126780 : file_guard: FileGuard,
211 1126780 : offset: u64,
212 1126780 : buf: FullSlice<B>,
213 1126780 : ) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
214 1126780 : match self {
215 0 : IoEngine::NotSet => panic!("not initialized"),
216 : IoEngine::StdFs => {
217 563395 : let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
218 563395 : ((file_guard, buf), result)
219 : }
220 : #[cfg(target_os = "linux")]
221 : IoEngine::TokioEpollUring => {
222 563385 : let system = tokio_epoll_uring_ext::thread_local_system().await;
223 563385 : let ((file_guard, slice), res) =
224 563386 : system.write(file_guard, offset, buf.into_raw_slice()).await;
225 563385 : (
226 563385 : (file_guard, FullSlice::must_new(slice)),
227 563385 : res.map_err(epoll_uring_error_to_std),
228 563385 : )
229 : }
230 : }
231 1126780 : }
232 :
233 : /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
234 : /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
235 : /// whereas before the switch to [`super::io_engine`], that wasn't the case.
236 : /// This method helps avoid such a regression.
237 : ///
238 : /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
239 6 : pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
240 6 : where
241 6 : Fut: 'static + Send + std::future::Future<Output = R>,
242 6 : R: 'static + Send,
243 6 : {
244 6 : match self {
245 0 : IoEngine::NotSet => panic!("not initialized"),
246 : IoEngine::StdFs => {
247 3 : let span = tracing::info_span!("spawn_blocking_block_on_if_std");
248 3 : tokio::task::spawn_blocking({
249 3 : move || tokio::runtime::Handle::current().block_on(work.instrument(span))
250 3 : })
251 3 : .await
252 3 : .expect("failed to join blocking code most likely it panicked, panicking as well")
253 : }
254 : #[cfg(target_os = "linux")]
255 6 : IoEngine::TokioEpollUring => work.await,
256 : }
257 6 : }
258 : }
259 :
260 : pub enum FeatureTestResult {
261 : PlatformPreferred(IoEngineKind),
262 : Worse {
263 : engine: IoEngineKind,
264 : remark: String,
265 : },
266 : }
267 :
268 : impl FeatureTestResult {
269 : #[cfg(target_os = "linux")]
270 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
271 : #[cfg(not(target_os = "linux"))]
272 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
273 : }
274 :
275 : impl From<FeatureTestResult> for IoEngineKind {
276 0 : fn from(val: FeatureTestResult) -> Self {
277 0 : match val {
278 0 : FeatureTestResult::PlatformPreferred(e) => e,
279 0 : FeatureTestResult::Worse { engine, .. } => engine,
280 : }
281 0 : }
282 : }
283 :
284 : /// Somewhat costly under the hood, do only once.
285 : /// Panics if we can't set up the feature test.
286 16 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
287 16 : std::thread::spawn(|| {
288 16 :
289 16 : #[cfg(not(target_os = "linux"))]
290 16 : {
291 16 : Ok(FeatureTestResult::PlatformPreferred(
292 16 : FeatureTestResult::PLATFORM_PREFERRED,
293 16 : ))
294 16 : }
295 16 : #[cfg(target_os = "linux")]
296 16 : {
297 16 : let rt = tokio::runtime::Builder::new_current_thread()
298 16 : .enable_all()
299 16 : .build()
300 16 : .unwrap();
301 16 : Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
302 : Ok(_) => FeatureTestResult::PlatformPreferred({
303 16 : assert!(matches!(
304 16 : IoEngineKind::TokioEpollUring,
305 : FeatureTestResult::PLATFORM_PREFERRED
306 : ));
307 16 : FeatureTestResult::PLATFORM_PREFERRED
308 : }),
309 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
310 0 : let remark = match e.raw_os_error() {
311 : Some(nix::libc::EPERM) => {
312 : // fall back
313 0 : "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
314 0 : .to_string()
315 : }
316 : Some(nix::libc::EFAULT) => {
317 : // fail feature test
318 0 : anyhow::bail!(
319 0 : "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
320 0 : );
321 : }
322 : Some(_) | None => {
323 : // fall back
324 0 : format!("creating tokio-epoll-uring fails with error: {e:#}")
325 : }
326 : };
327 0 : FeatureTestResult::Worse {
328 0 : engine: IoEngineKind::StdFs,
329 0 : remark,
330 0 : }
331 : }
332 : })
333 : }
334 16 : })
335 16 : .join()
336 16 : .unwrap()
337 16 : }
338 :
339 : /// For use in benchmark binaries only.
340 : ///
341 : /// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
342 : /// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
343 : /// developer time trying to figure out why it's slow.
344 : ///
345 : /// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
346 0 : pub fn io_engine_for_bench() -> IoEngineKind {
347 0 : #[cfg(not(target_os = "linux"))]
348 0 : {
349 0 : panic!("This benchmark does I/O and can only give a representative result on Linux");
350 0 : }
351 0 : #[cfg(target_os = "linux")]
352 0 : {
353 0 : match feature_test().unwrap() {
354 0 : FeatureTestResult::PlatformPreferred(engine) => engine,
355 : FeatureTestResult::Worse {
356 0 : engine: _engine,
357 0 : remark,
358 0 : } => {
359 0 : panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
360 : }
361 : }
362 : }
363 0 : }
|