Line data Source code
1 : //! [`super::VirtualFile`] supports different IO engines.
2 : //!
3 : //! The [`IoEngineKind`] enum identifies them.
4 : //!
5 : //! The choice of IO engine is global.
6 : //! Initialize using [`init`].
7 : //!
8 : //! Then use [`get`] and [`super::OpenOptions`].
9 : //!
10 : //!
11 :
12 : #[cfg(target_os = "linux")]
13 : pub(super) mod tokio_epoll_uring_ext;
14 :
15 : use tokio_epoll_uring::IoBuf;
16 : use tracing::Instrument;
17 :
18 : pub(crate) use super::api::IoEngineKind;
19 : #[derive(Clone, Copy)]
20 : #[repr(u8)]
21 : pub(crate) enum IoEngine {
22 : NotSet,
23 : StdFs,
24 : #[cfg(target_os = "linux")]
25 : TokioEpollUring,
26 : }
27 :
28 : impl From<IoEngineKind> for IoEngine {
29 122 : fn from(value: IoEngineKind) -> Self {
30 122 : match value {
31 0 : IoEngineKind::StdFs => IoEngine::StdFs,
32 : #[cfg(target_os = "linux")]
33 122 : IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
34 : }
35 122 : }
36 : }
37 :
38 : impl TryFrom<u8> for IoEngine {
39 : type Error = u8;
40 :
41 304564 : fn try_from(value: u8) -> Result<Self, Self::Error> {
42 304564 : Ok(match value {
43 304564 : v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
44 304442 : v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
45 : #[cfg(target_os = "linux")]
46 304442 : v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
47 0 : x => return Err(x),
48 : })
49 304564 : }
50 : }
51 :
52 : static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
53 :
54 122 : pub(crate) fn set(engine_kind: IoEngineKind) {
55 122 : let engine: IoEngine = engine_kind.into();
56 122 : IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
57 : #[cfg(not(test))]
58 0 : {
59 0 : let metric = &crate::metrics::virtual_file_io_engine::KIND;
60 0 : metric.reset();
61 0 : metric
62 0 : .with_label_values(&[&format!("{engine_kind}")])
63 0 : .set(1);
64 0 : }
65 122 : }
66 :
67 : #[cfg(not(test))]
68 0 : pub(super) fn init(engine_kind: IoEngineKind) {
69 0 : set(engine_kind);
70 0 : }
71 :
72 : /// Longer-term, this API should only be used by [`super::VirtualFile`].
73 304564 : pub(crate) fn get() -> IoEngine {
74 304564 : let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
75 304564 : if cfg!(test) {
76 304564 : let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
77 304564 : match cur {
78 : IoEngine::NotSet => {
79 122 : let kind = match std::env::var(env_var_name) {
80 122 : Ok(v) => match v.parse::<IoEngineKind>() {
81 122 : Ok(engine_kind) => engine_kind,
82 0 : Err(e) => {
83 0 : panic!(
84 0 : "invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}"
85 : )
86 : }
87 : },
88 : Err(std::env::VarError::NotPresent) => {
89 : #[cfg(target_os = "linux")]
90 : {
91 0 : IoEngineKind::TokioEpollUring
92 : }
93 : #[cfg(not(target_os = "linux"))]
94 : {
95 : IoEngineKind::StdFs
96 : }
97 : }
98 : Err(std::env::VarError::NotUnicode(_)) => {
99 0 : panic!("env var {env_var_name} is not unicode");
100 : }
101 : };
102 122 : self::set(kind);
103 122 : self::get()
104 : }
105 304442 : x => x,
106 : }
107 : } else {
108 0 : cur
109 : }
110 304564 : }
111 :
112 : use std::os::unix::prelude::FileExt;
113 : use std::sync::atomic::{AtomicU8, Ordering};
114 : #[cfg(target_os = "linux")]
115 : use {std::time::Duration, tracing::info};
116 :
117 : use super::owned_buffers_io::io_buf_ext::FullSlice;
118 : use super::owned_buffers_io::slice::SliceMutExt;
119 : use super::{FileGuard, Metadata};
120 :
121 : #[cfg(target_os = "linux")]
122 0 : pub(super) fn epoll_uring_error_to_std(
123 0 : e: tokio_epoll_uring::Error<std::io::Error>,
124 0 : ) -> std::io::Error {
125 0 : match e {
126 0 : tokio_epoll_uring::Error::Op(e) => e,
127 0 : tokio_epoll_uring::Error::System(system) => std::io::Error::other(system),
128 : }
129 0 : }
130 :
131 : impl IoEngine {
132 278582 : pub(super) async fn read_at<Buf>(
133 278582 : &self,
134 278582 : file_guard: FileGuard,
135 278582 : offset: u64,
136 278582 : mut slice: tokio_epoll_uring::Slice<Buf>,
137 278582 : ) -> (
138 278582 : (FileGuard, tokio_epoll_uring::Slice<Buf>),
139 278582 : std::io::Result<usize>,
140 278582 : )
141 278582 : where
142 278582 : Buf: tokio_epoll_uring::IoBufMut + Send,
143 278582 : {
144 278582 : match self {
145 0 : IoEngine::NotSet => panic!("not initialized"),
146 : IoEngine::StdFs => {
147 0 : let rust_slice = slice.as_mut_rust_slice_full_zeroed();
148 0 : let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
149 0 : ((file_guard, slice), res)
150 : }
151 : #[cfg(target_os = "linux")]
152 : IoEngine::TokioEpollUring => {
153 278582 : let system = tokio_epoll_uring_ext::thread_local_system().await;
154 278582 : let (resources, res) =
155 278582 : retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async {
156 278582 : system.read(file_guard, offset, slice).await
157 557164 : })
158 278582 : .await;
159 278582 : (resources, res.map_err(epoll_uring_error_to_std))
160 : }
161 : }
162 278582 : }
163 1452 : pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) {
164 1452 : match self {
165 0 : IoEngine::NotSet => panic!("not initialized"),
166 : IoEngine::StdFs => {
167 0 : let res = file_guard.with_std_file(|std_file| std_file.sync_all());
168 0 : (file_guard, res)
169 : }
170 : #[cfg(target_os = "linux")]
171 : IoEngine::TokioEpollUring => {
172 1452 : let system = tokio_epoll_uring_ext::thread_local_system().await;
173 1452 : let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
174 1452 : system.fsync(file_guard).await
175 2904 : })
176 1452 : .await;
177 1452 : (resources, res.map_err(epoll_uring_error_to_std))
178 : }
179 : }
180 1452 : }
181 0 : pub(super) async fn sync_data(
182 0 : &self,
183 0 : file_guard: FileGuard,
184 0 : ) -> (FileGuard, std::io::Result<()>) {
185 0 : match self {
186 0 : IoEngine::NotSet => panic!("not initialized"),
187 : IoEngine::StdFs => {
188 0 : let res = file_guard.with_std_file(|std_file| std_file.sync_data());
189 0 : (file_guard, res)
190 : }
191 : #[cfg(target_os = "linux")]
192 : IoEngine::TokioEpollUring => {
193 0 : let system = tokio_epoll_uring_ext::thread_local_system().await;
194 0 : let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
195 0 : system.fdatasync(file_guard).await
196 0 : })
197 0 : .await;
198 0 : (resources, res.map_err(epoll_uring_error_to_std))
199 : }
200 : }
201 0 : }
202 930 : pub(super) async fn metadata(
203 930 : &self,
204 930 : file_guard: FileGuard,
205 930 : ) -> (FileGuard, std::io::Result<Metadata>) {
206 930 : match self {
207 0 : IoEngine::NotSet => panic!("not initialized"),
208 : IoEngine::StdFs => {
209 0 : let res =
210 0 : file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from));
211 0 : (file_guard, res)
212 : }
213 : #[cfg(target_os = "linux")]
214 : IoEngine::TokioEpollUring => {
215 930 : let system = tokio_epoll_uring_ext::thread_local_system().await;
216 930 : let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
217 930 : system.statx(file_guard).await
218 1860 : })
219 930 : .await;
220 930 : (
221 930 : resources,
222 930 : res.map_err(epoll_uring_error_to_std).map(Metadata::from),
223 930 : )
224 : }
225 : }
226 930 : }
227 :
228 7 : pub(super) async fn set_len(
229 7 : &self,
230 7 : file_guard: FileGuard,
231 7 : len: u64,
232 7 : ) -> (FileGuard, std::io::Result<()>) {
233 7 : match self {
234 0 : IoEngine::NotSet => panic!("not initialized"),
235 : IoEngine::StdFs => {
236 0 : let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
237 0 : (file_guard, res)
238 : }
239 : #[cfg(target_os = "linux")]
240 : IoEngine::TokioEpollUring => {
241 : // TODO: ftruncate op for tokio-epoll-uring
242 : // Don't forget to use retry_ecanceled_once
243 7 : let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
244 7 : (file_guard, res)
245 : }
246 : }
247 7 : }
248 :
249 19808 : pub(super) async fn write_at<B: IoBuf + Send>(
250 19808 : &self,
251 19808 : file_guard: FileGuard,
252 19808 : offset: u64,
253 19808 : buf: FullSlice<B>,
254 19808 : ) -> ((FileGuard, FullSlice<B>), std::io::Result<usize>) {
255 19808 : match self {
256 0 : IoEngine::NotSet => panic!("not initialized"),
257 : IoEngine::StdFs => {
258 0 : let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset));
259 0 : ((file_guard, buf), result)
260 : }
261 : #[cfg(target_os = "linux")]
262 : IoEngine::TokioEpollUring => {
263 19808 : let system = tokio_epoll_uring_ext::thread_local_system().await;
264 19808 : let ((file_guard, slice), res) = retry_ecanceled_once(
265 19808 : (file_guard, buf.into_raw_slice()),
266 0 : async |(file_guard, buf)| system.write(file_guard, offset, buf).await,
267 : )
268 19808 : .await;
269 19808 : (
270 19808 : (file_guard, FullSlice::must_new(slice)),
271 19808 : res.map_err(epoll_uring_error_to_std),
272 19808 : )
273 : }
274 : }
275 19808 : }
276 :
277 : /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
278 : /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
279 : /// whereas before the switch to [`super::io_engine`], that wasn't the case.
280 : /// This method helps avoid such a regression.
281 : ///
282 : /// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
283 7 : pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
284 7 : where
285 7 : Fut: 'static + Send + std::future::Future<Output = R>,
286 7 : R: 'static + Send,
287 7 : {
288 7 : match self {
289 0 : IoEngine::NotSet => panic!("not initialized"),
290 : IoEngine::StdFs => {
291 0 : let span = tracing::info_span!("spawn_blocking_block_on_if_std");
292 0 : tokio::task::spawn_blocking({
293 0 : move || tokio::runtime::Handle::current().block_on(work.instrument(span))
294 : })
295 0 : .await
296 0 : .expect("failed to join blocking code most likely it panicked, panicking as well")
297 : }
298 : #[cfg(target_os = "linux")]
299 7 : IoEngine::TokioEpollUring => work.await,
300 : }
301 7 : }
302 : }
303 :
304 : /// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data,
305 : /// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED.
306 : /// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals.
307 : /// Investigation ticket: <https://github.com/neondatabase/neon/issues/11446>
308 : ///
309 : /// This function retries the operation once if it fails with ECANCELED.
310 : /// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations.
311 : #[cfg(target_os = "linux")]
312 399794 : pub(super) async fn retry_ecanceled_once<F, Fut, T, V>(
313 399794 : resources: T,
314 399794 : f: F,
315 399794 : ) -> (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)
316 399794 : where
317 399794 : F: Fn(T) -> Fut,
318 399794 : Fut: std::future::Future<Output = (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)>,
319 399794 : T: Send,
320 399794 : V: Send,
321 399794 : {
322 399794 : let (resources, res) = f(resources).await;
323 399794 : let Err(e) = res else {
324 399794 : return (resources, res);
325 : };
326 0 : let tokio_epoll_uring::Error::Op(err) = e else {
327 0 : return (resources, Err(e));
328 : };
329 0 : if err.raw_os_error() != Some(nix::libc::ECANCELED) {
330 0 : return (resources, Err(tokio_epoll_uring::Error::Op(err)));
331 0 : }
332 : {
333 : static RATE_LIMIT: std::sync::Mutex<utils::rate_limit::RateLimit> =
334 : std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1)));
335 0 : let mut guard = RATE_LIMIT.lock().unwrap();
336 0 : guard.call2(|rate_limit_stats| {
337 0 : info!(
338 0 : %rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited"
339 : );
340 0 : });
341 0 : drop(guard);
342 : }
343 0 : tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners
344 0 : let (resources, res) = f(resources).await;
345 0 : (resources, res)
346 399794 : }
347 :
348 0 : pub(super) fn panic_operation_must_be_idempotent() {
349 0 : panic!(
350 0 : "unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)"
351 : )
352 : }
353 :
354 : pub enum FeatureTestResult {
355 : PlatformPreferred(IoEngineKind),
356 : Worse {
357 : engine: IoEngineKind,
358 : remark: String,
359 : },
360 : }
361 :
362 : impl FeatureTestResult {
363 : #[cfg(target_os = "linux")]
364 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
365 : #[cfg(not(target_os = "linux"))]
366 : const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
367 : }
368 :
369 : impl From<FeatureTestResult> for IoEngineKind {
370 0 : fn from(val: FeatureTestResult) -> Self {
371 0 : match val {
372 0 : FeatureTestResult::PlatformPreferred(e) => e,
373 0 : FeatureTestResult::Worse { engine, .. } => engine,
374 : }
375 0 : }
376 : }
377 :
378 : /// Somewhat costly under the hood, do only once.
379 : /// Panics if we can't set up the feature test.
380 138 : pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
381 138 : std::thread::spawn(|| {
382 :
383 : #[cfg(not(target_os = "linux"))]
384 : {
385 : Ok(FeatureTestResult::PlatformPreferred(
386 : FeatureTestResult::PLATFORM_PREFERRED,
387 : ))
388 : }
389 : #[cfg(target_os = "linux")]
390 : {
391 138 : let rt = tokio::runtime::Builder::new_current_thread()
392 138 : .enable_all()
393 138 : .build()
394 138 : .unwrap();
395 138 : Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
396 : Ok(_) => FeatureTestResult::PlatformPreferred({
397 138 : assert!(matches!(
398 138 : IoEngineKind::TokioEpollUring,
399 : FeatureTestResult::PLATFORM_PREFERRED
400 : ));
401 138 : FeatureTestResult::PLATFORM_PREFERRED
402 : }),
403 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
404 0 : let remark = match e.raw_os_error() {
405 : Some(nix::libc::EPERM) => {
406 : // fall back
407 0 : "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
408 0 : .to_string()
409 : }
410 : Some(nix::libc::EFAULT) => {
411 : // fail feature test
412 0 : anyhow::bail!(
413 0 : "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
414 : );
415 : }
416 : Some(_) | None => {
417 : // fall back
418 0 : format!("creating tokio-epoll-uring fails with error: {e:#}")
419 : }
420 : };
421 0 : FeatureTestResult::Worse {
422 0 : engine: IoEngineKind::StdFs,
423 0 : remark,
424 0 : }
425 : }
426 : })
427 : }
428 138 : })
429 138 : .join()
430 138 : .unwrap()
431 138 : }
432 :
433 : /// For use in benchmark binaries only.
434 : ///
435 : /// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
436 : /// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
437 : /// developer time trying to figure out why it's slow.
438 : ///
439 : /// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
440 0 : pub fn io_engine_for_bench() -> IoEngineKind {
441 : #[cfg(not(target_os = "linux"))]
442 : {
443 : panic!("This benchmark does I/O and can only give a representative result on Linux");
444 : }
445 : #[cfg(target_os = "linux")]
446 : {
447 0 : match feature_test().unwrap() {
448 0 : FeatureTestResult::PlatformPreferred(engine) => engine,
449 : FeatureTestResult::Worse {
450 0 : engine: _engine,
451 0 : remark,
452 : } => {
453 0 : panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
454 : }
455 : }
456 : }
457 0 : }
|